Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-05 Thread Aaron Davidson
ConnectionManager has been deprecated and is no longer used by default
(NettyBlockTransferService is the replacement). Hopefully you would no
longer see these messages unless you have explicitly flipped it back on.

On Tue, Aug 4, 2015 at 6:14 PM, Jim Green openkbi...@gmail.com wrote:

 And also https://issues.apache.org/jira/browse/SPARK-3106
 This one is still open.

 On Tue, Aug 4, 2015 at 6:12 PM, Jim Green openkbi...@gmail.com wrote:

 *Symotom:*
 Even sample job fails:
 $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10
 Pi is roughly 3.140636
 ERROR ConnectionManager: Corresponding SendingConnection to
 ConnectionManagerId(xxx,) not found
 WARN ConnectionManager: All connections not cleaned up

 Found https://issues.apache.org/jira/browse/SPARK-3322
 But the code changes are not in newer version os Spark, however this jira
 is marked as fixed.
 Is this issue really fixed in latest version? If so, what is the related
 JIRA?

 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)




 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)



Re: S3 vs HDFS

2015-07-11 Thread Aaron Davidson
Note that if you use multi-part upload, each part becomes 1 block, which
allows for multiple concurrent readers. One would typically use fixed-size
block sizes which align with Spark's default HDFS block size (64 MB, I
think) to ensure the reads are aligned.

On Sat, Jul 11, 2015 at 11:14 AM, Steve Loughran ste...@hortonworks.com
wrote:

  seek() is very, very expensive on s3, even short forward seeks. If your
 code does a lot of, it will kill performance. (forward seeks are better in
 s3a, which with Hadoop 2.3 is now something safe to use, and in the s3
 client that Amazon include in EMR), but its still sluggish.

  The other killers are
  -anything involving renaming files or directories
  -copy operations
  -listing lots of files.

  Finally, S3 is HDD backed,1 file == 1 block. In HDFS while you can have
 3 processes reading different replicas of the same block of a file —giving
 3x the bandwidth, disk bandwidth from an s3 object will be shared by all
 readers. The more readers: the worse performance


  On 9 Jul 2015, at 14:31, Daniel Darabos daniel.dara...@lynxanalytics.com
 wrote:

  I recommend testing it for yourself. Even if you have no application,
 you can just run the spark-ec2 script, log in, run spark-shell and try
 reading files from an S3 bucket and from hdfs://master IP:9000/. (This is
 the ephemeral HDFS cluster, which uses SSD.)

  I just tested our application this way yesterday and found the SSD-based
 HDFS to outperform S3 by a factor of 2. I don't know the cause. It may be
 locality like Akhil suggests, or SSD vs HDD (assuming S3 is HDD-backed). Or
 the HDFS client library and protocol are just better than the S3 versions
 (which is HTTP-based and uses some 6-year-old libraries).

 On Thu, Jul 9, 2015 at 9:54 AM, Sujee Maniyam su...@sujee.net wrote:

 latency is much bigger for S3 (if that matters)
 And with HDFS you'd get data-locality that will boost your app
 performance.

  I did some light experimenting on this.
 see my presentation here for some benchmark numbers ..etc
 http://www.slideshare.net/sujee/hadoop-to-sparkv2
  from slide# 34

  cheers
  Sujee Maniyam (http://sujee.net |
 http://www.linkedin.com/in/sujeemaniyam )
  teaching Spark
 http://elephantscale.com/training/spark-for-developers/?utm_source=mailinglistutm_medium=emailutm_campaign=signature


 On Wed, Jul 8, 2015 at 11:35 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Are there any significant performance differences between reading text
 files from S3 and hdfs?







Re: All master are unreponsive issue

2015-07-05 Thread Aaron Davidson
Are you seeing this after the app has already been running for some time,
or just at the beginning? Generally, registration should only occur once
initially, and a timeout would be due to the master not being accessible.
Try telneting to the master IP/port from the machine on which the driver
will run.


Re: s3 bucket access/read file

2015-07-01 Thread Aaron Davidson
I think 2.6 failed to abruptly close streams that weren't fully read, which
we observed as a huge performance hit. We had to backport the 2.7
improvements before being able to use it.


RE: ReduceByKey with a byte array as the key

2015-06-11 Thread Aaron Davidson
Be careful shoving arbitrary binary data into a string, invalid utf
characters can cause significant computational overhead in my experience.
On Jun 11, 2015 10:09 AM, Mark Tse mark@d2l.com wrote:

  Makes sense – I suspect what you suggested should work.



 However, I think the overhead between this and using `String` would be
 similar enough to warrant just using `String`.



 Mark



 *From:* Sonal Goyal [mailto:sonalgoy...@gmail.com]
 *Sent:* June-11-15 12:58 PM
 *To:* Mark Tse
 *Cc:* user@spark.apache.org
 *Subject:* Re: ReduceByKey with a byte array as the key



 I think if you wrap the byte[] into an object and implement equals and
 hashcode methods, you may be able to do this. There will be the overhead of
 extra object, but conceptually it should work unless I am missing
 something.


  Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co

 Check out Reifier at Spark Summit 2015
 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/





 On Thu, Jun 11, 2015 at 9:27 PM, Mark Tse mark@d2l.com wrote:

  I would like to work with RDD pairs of Tuple2byte[], obj, but byte[]s
 with the same contents are considered as different values because their
 reference values are different.



 I didn't see any to pass in a custom comparer. I could convert the byte[]
 into a String with an explicit charset, but I'm wondering if there's a more
 efficient way.



 Also posted on SO: http://stackoverflow.com/q/30785615/2687324



 Thanks,

 Mark





Re: RDD resiliency -- does it keep state?

2015-03-28 Thread Aaron Davidson
Note that speculation is off by default to avoid these kinds of unexpected
issues.

On Sat, Mar 28, 2015 at 6:21 AM, Steve Loughran ste...@hortonworks.com
wrote:


 It's worth adding that there's no guaranteed that re-evaluated work would
 be on the same host as before, and in the case of node failure, it is not
 guaranteed to be elsewhere.

 this means things that depend on host-local information is going to
 generate different numbers even if there are no other side effects. random
 number generation for seeding RDD.sample() would be a case in point here.

 There's also the fact that if you enable speculative execution, then
 operations may be repeated —even in the absence of any failure. If you are
 doing side effect work, or don't have an output committer whose actions are
 guaranteed to be atomic then you want to turn that option off.

  On 27 Mar 2015, at 19:46, Patrick Wendell pwend...@gmail.com wrote:
 
  If you invoke this, you will get at-least-once semantics on failure.
  For instance, if a machine dies in the middle of executing the foreach
  for a single partition, that will be re-executed on another machine.
  It could even fully complete on one machine, but the machine dies
  immediately before reporting the result back to the driver.
 
  This means you need to make sure the side-effects are idempotent, or
  use some transactional locking. Spark's own output operations, such as
  saving to Hadoop, use such mechanisms. For instance, in the case of
  Hadoop it uses the OutputCommitter classes.
 
  - Patrick
 
  On Fri, Mar 27, 2015 at 12:36 PM, Michal Klos michal.klo...@gmail.com
 wrote:
  Hi Spark group,
 
  We haven't been able to find clear descriptions of how Spark handles the
  resiliency of RDDs in relationship to executing actions with
 side-effects.
  If you do an `rdd.foreach(someSideEffect)`, then you are doing a
 side-effect
  for each element in the RDD. If a partition goes down -- the resiliency
  rebuilds the data,  but did it keep track of how far it go in the
  partition's set of data or will it start from the beginning again. So
 will
  it do at-least-once execution of foreach closures or at-most-once?
 
  thanks,
  Michal
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-17 Thread Aaron Davidson
Actually, this is the more relevant JIRA (which is resolved):
https://issues.apache.org/jira/browse/SPARK-3595

6352 is about saveAsParquetFile, which is not in use here.

Here is a DirectOutputCommitter implementation:
https://gist.github.com/aarondav/c513916e72101bbe14ec

and it can be configured in Spark with:
sparkConf.set(spark.hadoop.mapred.output.committer.class,
classOf[DirectOutputCommitter].getName)

On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid iras...@cloudera.com wrote:

 I'm not super familiar w/ S3, but I think the issue is that you want to
 use a different output committers with object stores, that don't have a
 simple move operation.  There have been a few other threads on S3 
 outputcommitters.  I think the most relevant for you is most probably this
 open JIRA:

 https://issues.apache.org/jira/browse/SPARK-6352

 On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com
 wrote:

 Hi All,



 I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run
 it as a single node cluster for test. The data I use to sort is around 4GB
 and sit on S3, output will also on S3.



 I just connect spark-shell to the local cluster and run the code in the
 script (because I just want a benchmark now).



 My job is as simple as:

 val parquetFile =
 sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,)

 parquetFile.registerTempTable(Test)

 val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map
 { row = { row.mkString(\t) } }

 sortedResult.saveAsTextFile(s3n://myplace,);



 The job takes around 6 mins to finish the sort when I am monitoring the
 process. After I notice the process stop at:



 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
 console:31, took 581.304992 s



 At that time, the spark actually just write all the data to the
 _temporary folder first, after all sub-tasks finished, it will try to move
 all the ready result from _temporary folder to the final location. This
 process might be quick locally (because it will just be a cut/paste), but
 it looks like very slow on my S3, it takes a few second to move one file
 (usually there will be 200 partitions). And then it raise exceptions after
 it move might be 40-50 files.



 org.apache.http.NoHttpResponseException: The target server failed to
 respond

 at
 org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)

 at
 org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)

 at
 org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)

 at
 org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)

 at
 org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)





 I try several times, but never get the full job finished. I am not sure
 anything wrong here, but I use something very basic and I can see the job
 has finished and all result on the S3 under temporary folder, but then it
 raise the exception and fail.



 Any special setting I should do here when deal with S3?



 I don’t know what is the issue here, I never see MapReduce has similar
 issue. So it could not be S3’s problem.



 Regards,



 Shuai





Re: Which OutputCommitter to use for S3?

2015-03-05 Thread Aaron Davidson
Yes, unfortunately that direct dependency makes this injection much more
difficult for saveAsParquetFile.

On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee pl...@appier.com wrote:

 Thanks for the DirectOutputCommitter example.
 However I found it only works for saveAsHadoopFile. What about
 saveAsParquetFile?
 It looks like SparkSQL is using ParquetOutputCommitter, which is subclass
 of FileOutputCommitter.

 On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor 
 thomas.dem...@amplidata.com
 wrote:

  FYI. We're currently addressing this at the Hadoop level in
  https://issues.apache.org/jira/browse/HADOOP-9565
 
 
  Thomas Demoor
 
  On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath 
  ddmcbe...@yahoo.com.invalid wrote:
 
  Just to close the loop in case anyone runs into the same problem I had.
 
  By setting --hadoop-major-version=2 when using the ec2 scripts,
  everything worked fine.
 
  Darin.
 
 
  - Original Message -
  From: Darin McBeath ddmcbe...@yahoo.com.INVALID
  To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com
  Cc: user@spark.apache.org user@spark.apache.org
  Sent: Monday, February 23, 2015 3:16 PM
  Subject: Re: Which OutputCommitter to use for S3?
 
  Thanks.  I think my problem might actually be the other way around.
 
  I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
  scripts, I don't specify a
  -hadoop-major-version and the default is 1.   I'm guessing that if I
 make
  that a 2 that it might work correctly.  I'll try it and post a response.
 
 
  - Original Message -
  From: Mingyu Kim m...@palantir.com
  To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson 
  ilike...@gmail.com
  Cc: user@spark.apache.org user@spark.apache.org
  Sent: Monday, February 23, 2015 3:06 PM
  Subject: Re: Which OutputCommitter to use for S3?
 
  Cool, we will start from there. Thanks Aaron and Josh!
 
  Darin, it¹s likely because the DirectOutputCommitter is compiled with
  Hadoop 1 classes and you¹re running it with Hadoop 2.
  org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and
 it
  became an interface in Hadoop 2.
 
  Mingyu
 
 
 
 
 
  On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID
  wrote:
 
  Aaron.  Thanks for the class. Since I'm currently writing Java based
  Spark applications, I tried converting your class to Java (it seemed
  pretty straightforward).
  
  I set up the use of the class as follows:
  
  SparkConf conf = new SparkConf()
  .set(spark.hadoop.mapred.output.committer.class,
  com.elsevier.common.DirectOutputCommitter);
  
  And I then try and save a file to S3 (which I believe should use the
 old
  hadoop apis).
  
  JavaPairRDDText, Text newBaselineRDDWritable =
  reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
  newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
  Text.class, Text.class, SequenceFileOutputFormat.class,
  org.apache.hadoop.io.compress.GzipCodec.class);
  
  But, I get the following error message.
  
  Exception in thread main java.lang.IncompatibleClassChangeError:
 Found
  class org.apache.hadoop.mapred.JobContext, but interface was expected
  at
 
 
 com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
  java:68)
  at
 
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
  .scala:1075)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
  ala:940)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
  ala:902)
  at
 
 
 org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
  71)
  at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
  
  In my class, JobContext is an interface of  type
  org.apache.hadoop.mapred.JobContext.
  
  Is there something obvious that I might be doing wrong (or messed up in
  the translation from Scala to Java) or something I should look into?
 I'm
  using Spark 1.2 with hadoop 2.4.
  
  
  Thanks.
  
  Darin.
  
  
  
  
  
  From: Aaron Davidson ilike...@gmail.com
  To: Andrew Ash and...@andrewash.com
  Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com;
  user@spark.apache.org user@spark.apache.org; Aaron Davidson
  aa...@databricks.com
  Sent: Saturday, February 21, 2015 7:01 PM
  Subject: Re: Which OutputCommitter to use for S3?
  
  
  
  Here is the class:
  
 
 https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
 
 
 dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
 
 
 Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs
 
 zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e=
  
  You can use it by setting mapred.output.committer.class in the Hadoop
  configuration (or spark.hadoop.mapred.output.committer.class in the
  Spark configuration). Note that this only

Re: Having lots of FetchFailedException in join

2015-03-05 Thread Aaron Davidson
However, Executors were dying when using Netty as well, so it is possible
that the OOM was occurring then too. It is also possible only one of your
Executors OOMs (due to a particularly large task) and the others display
various exceptions while trying to fetch the shuffle blocks from the failed
executor.

I cannot explain the local FileNotFoundExcepions occurring on machines that
were not throwing fatal errors, though -- typically I have only seen that
happen when a fatal error (e.g., OOM) was thrown on an Executor, causing it
to begin the termination process which involves deleting its own shuffle
files. It may then throw the FNF if other Executors request those files
before it has completed its shutdown (and will throw a ConnectionFailed
once it's completed terminating).

On Thu, Mar 5, 2015 at 12:19 AM, Shao, Saisai saisai.s...@intel.com wrote:

  I’ve no idea why Netty didn’t meet OOM issue, one possibility is that
 Netty uses direct memory to save each block, whereas NIO uses on-heap
 memory, so Netty occupies less on heap memory than NIO.





 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 4:14 PM

 *To:* Shao, Saisai
 *Cc:* Cheng, Hao; user
 *Subject:* Re: Having lots of FetchFailedException in join



 Thanks. I was about to submit a ticket for this :)



 Also there's a ticket for sort-merge based groupbykey
 https://issues.apache.org/jira/browse/SPARK-3461



 BTW, any idea why run with netty didn't output OOM error messages? It's
 very confusing in troubleshooting.





 Jianshi



 On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think there’s a lot of JIRA trying to solve this problem (
 https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge
 join is a good choice.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:55 PM
 *To:* Shao, Saisai
 *Cc:* Cheng, Hao; user


 *Subject:* Re: Having lots of FetchFailedException in join



 There're some skew.



 64

 6164

 0

 SUCCESS

 PROCESS_LOCAL

 200 / 

 2015/03/04 23:45:47

 1.1 min

 6 s

 198.6 MB

 21.1 GB

 240.8 MB

 59

 6159

 0

 SUCCESS

 PROCESS_LOCAL

 30 / 

 2015/03/04 23:45:47

 44 s

 5 s

 200.7 MB

 4.8 GB

 154.0 MB

 But I expect this kind of skewness to be quite common.



 Jianshi





 On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I see. I'm using core's join. The data might have some skewness
 (checking).



 I understand shuffle can spill data to disk but when consuming it, say in
 cogroup or groupByKey, it still needs to read the whole group elements,
 right? I guess OOM happened there when reading very large groups.



 Jianshi



 On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think what you could do is to monitor through web UI to see if there’s
 any skew or other symptoms in shuffle write and read. For GC you could use
 the below configuration as you mentioned.



 From Spark core side, all the shuffle related operations can spill the
 data into disk and no need to read the whole partition into memory. But if
 you uses SparkSQL, it depends on how SparkSQL uses this operators.



 CC @hao if he has some thoughts on it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:28 PM
 *To:* Shao, Saisai


 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 Hi Saisai,



 What's your suggested settings on monitoring shuffle? I've
 enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.



 I found SPARK-3461 (Support external groupByKey using
 repartitionAndSortWithinPartitions) want to make groupByKey using external
 storage. It's still open status. Does that mean now
 groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
 the group as a whole during consuming?



 How can I deal with the key skewness in joins? Is there a skew-join
 implementation?





 Jianshi







 On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space

Re: Having lots of FetchFailedException in join

2015-03-03 Thread Aaron Davidson
Failed to connect implies that the executor at that host died, please
check its logs as well.

On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Sorry that I forgot the subject.

 And in the driver, I got many FetchFailedException. The error messages are

 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID
 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0,
 mapId=24, reduceId=1220, message=
 org.apache.spark.shuffle.FetchFailedException: Failed to connect to
 /:43070
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)


 Jianshi

 On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I got this error message:

 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting
 block fetches
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at
 org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


 And then for the same index file and executor, I got the following errors
 multiple times

 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get
 block(s) from host-:39534
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
 shuffle_0_13_1228, and will not retry (0 retries)
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 ...
 Caused by: java.net.ConnectException: Connection refused: host-


 What's the problem?

 BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any
 bug fixes related to shuffle block fetching or index files after that?


 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Re: Having lots of FetchFailedException in join

2015-03-03 Thread Aaron Davidson
:319)
 at
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
 at
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
 at
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
 at java.lang.Thread.run(Thread.java:724)


 Jianshi

 On Wed, Mar 4, 2015 at 3:25 AM, Aaron Davidson ilike...@gmail.com wrote:

 Failed to connect implies that the executor at that host died, please
 check its logs as well.

 On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Sorry that I forgot the subject.

 And in the driver, I got many FetchFailedException. The error messages
 are

 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID
 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0,
 mapId=24, reduceId=1220, message=
 org.apache.spark.shuffle.FetchFailedException: Failed to connect to
 /:43070
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)


 Jianshi

 On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I got this error message:

 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting
 block fetches
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at
 org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at
 scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


 And then for the same index file and executor, I got the following
 errors multiple times

 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get
 block(s) from host-:39534
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
 shuffle_0_13_1228, and will not retry (0 retries)
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 ...
 Caused by: java.net.ConnectException: Connection refused: host-


 What's the problem?

 BTW, I'm using Spark 1.2.1-SNAPSHOT I built

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Aaron Davidson
All stated symptoms are consistent with GC pressure (other nodes timeout
trying to connect because of a long stop-the-world), quite possibly due to
groupByKey. groupByKey is a very expensive operation as it may bring all
the data for a particular partition into memory (in particular, it cannot
spill values for a single key, so if you have a single very skewed key you
can get behavior like this).

On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote:

 But groupbykey will repartition according to numer of keys as I understand
 how it works. How do you know that you haven't reached the groupbykey
 phase? Are you using a profiler or do yoi base that assumption only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
 disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
 stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey
 + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 

Re: Worker and Nodes

2015-02-21 Thread Aaron Davidson
Note that the parallelism (i.e., number of partitions) is just an upper
bound on how much of the work can be done in parallel. If you have 200
partitions, then you can divide the work among between 1 and 200 cores and
all resources will remain utilized. If you have more than 200 cores,
though, then some will not be used, so you would want to increase
parallelism further. (There are other rules-of-thumb -- for instance, it's
generally good to have at least 2x more partitions than cores for straggler
mitigation, but these are essentially just optimizations.)

Further note that when you increase the number of Executors for the same
set of resources (i.e., starting 10 Executors on a single machine instead
of 1), you make Spark's job harder. Spark has to communicate in an
all-to-all manner across Executors for shuffle operations, and it uses TCP
sockets to do so whether or not the Executors happen to be on the same
machine. So increasing Executors without increasing physical resources
means Spark has to do more communication to do the same work.

We expect that increasing the number of Executors by a factor of 10, given
an increase in the number of physical resources by the same factor, would
also improve performance by 10x. This is not always the case for the
precise reason above (increased communication overhead), but typically we
can get close. The actual observed improvement is very algorithm-dependent,
though; for instance, some ML algorithms become hard to scale out past a
certain point because the increase in communication overhead outweighs the
increase in parallelism.

On Sat, Feb 21, 2015 at 8:19 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 So, if I keep the number of instances constant and increase the degree of
 parallelism in steps, can I expect the performance to increase?

 Thank You

 On Sat, Feb 21, 2015 at 9:07 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 So, with the increase in the number of worker instances, if I also
 increase the degree of parallelism, will it make any difference?
 I can use this model even the other way round right? I can always predict
 the performance of an app with the increase in number of worker instances,
 the deterioration in performance, right?

 Thank You

 On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 Yes, I have decreased the executor memory.
 But,if I have to do this, then I have to tweak around with the code
 corresponding to each configuration right?

 On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote:

 Workers has a specific meaning in Spark. You are running many on one
 machine? that's possible but not usual.

 Each worker's executors have access to a fraction of your machine's
 resources then. If you're not increasing parallelism, maybe you're not
 actually using additional workers, so are using less resource for your
 problem.

 Or because the resulting executors are smaller, maybe you're hitting
 GC thrashing in these executors with smaller heaps.

 Or if you're not actually configuring the executors to use less
 memory, maybe you're over-committing your RAM and swapping?

 Bottom line, you wouldn't use multiple workers on one small standalone
 node. This isn't a good way to estimate performance on a distributed
 cluster either.

 On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan 
 pradhandeep1...@gmail.com wrote:
  No, I just have a single node standalone cluster.
 
  I am not tweaking around with the code to increase parallelism. I am
 just
  running SparkKMeans that is there in Spark-1.0.0
  I just wanted to know, if this behavior is natural. And if so, what
 causes
  this?
 
  Thank you
 
  On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com
 wrote:
 
  What's your storage like? are you adding worker machines that are
  remote from where the data lives? I wonder if it just means you are
  spending more and more time sending the data over the network as you
  try to ship more of it to more remote workers.
 
  To answer your question, no in general more workers means more
  parallelism and therefore faster execution. But that depends on a lot
  of things. For example, if your process isn't parallelize to use all
  available execution slots, adding more slots doesn't do anything.
 
  On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan 
 pradhandeep1...@gmail.com
  wrote:
   Yes, I am talking about standalone single node cluster.
  
   No, I am not increasing parallelism. I just wanted to know if it is
   natural.
   Does message passing across the workers account for the happenning?
  
   I am running SparkKMeans, just to validate one prediction model. I
 am
   using
   several data sets. I have a standalone mode. I am varying the
 workers
   from 1
   to 16
  
   On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com
 wrote:
  
   I can imagine a few reasons. Adding workers might cause fewer
 tasks to
   execute locally (?) So you may be execute more remotely.
  
   Are you 

Re: Which OutputCommitter to use for S3?

2015-02-21 Thread Aaron Davidson
Here is the class: https://gist.github.com/aarondav/c513916e72101bbe14ec

You can use it by setting mapred.output.committer.class in the Hadoop
configuration (or spark.hadoop.mapred.output.committer.class in the Spark
configuration). Note that this only works for the old Hadoop APIs, I
believe the new Hadoop APIs strongly tie committer to input format (so
FileInputFormat always uses FileOutputCommitter), which makes this fix more
difficult to apply.

On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote:

 Josh is that class something you guys would consider open sourcing, or
 would you rather the community step up and create an OutputCommitter
 implementation optimized for S3?

 On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote:

 We (Databricks) use our own DirectOutputCommitter implementation, which
 is a couple tens of lines of Scala code.  The class would almost entirely
 be a no-op except we took some care to properly handle the _SUCCESS file.

 On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote:

  I didn’t get any response. It’d be really appreciated if anyone using
 a special OutputCommitter for S3 can comment on this!

  Thanks,
 Mingyu

   From: Mingyu Kim m...@palantir.com
 Date: Monday, February 16, 2015 at 1:15 AM
 To: user@spark.apache.org user@spark.apache.org
 Subject: Which OutputCommitter to use for S3?

   HI all,

  The default OutputCommitter used by RDD, which is FileOutputCommitter,
 seems to require moving files at the commit step, which is not a constant
 operation in S3, as discussed in
 http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E
 https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=.
 People seem to develop their own NullOutputCommitter implementation or use
 DirectFileOutputCommitter (as mentioned in SPARK-3595
 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=),
 but I wanted to check if there is a de facto standard, publicly available
 OutputCommitter to use for S3 in conjunction with Spark.

  Thanks,
 Mingyu






Re: RangePartitioner in Spark 1.2.1

2015-02-17 Thread Aaron Davidson
RangePartitioner does not actually provide a guarantee that all partitions
will be equal sized (that is hard), and instead uses sampling to
approximate equal buckets. Thus, it is possible that a bucket is left empty.

If you want the specified behavior, you should define your own partitioner.
It would look something like this (untested):
class AlphabetPartitioner extends Partitioner {
  def numPartitions = 26
  def getPartition(key: Any): Int = key match {
case s: String = s(0).toUpper - 'A'
  }
  override def equals(other: Any): Boolean =
other.isInstanceOf[AlphabetPartitioner]
  override def hashCode: Int = 0
}

On Tue, Feb 17, 2015 at 7:05 PM, java8964 java8...@hotmail.com wrote:

 Hi, Sparkers:

 I just happened to search in google for something related to the
 RangePartitioner of spark, and found an old thread in this email list as
 here:


 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html

 I followed the code example mentioned in that email thread as following:

 scala  import org.apache.spark.RangePartitioner
 import org.apache.spark.RangePartitioner

 scala val rdd = sc.parallelize(List(apple, Ball, cat, dog,
 Elephant, fox, gas, horse, index, jet, kitsch, long,
 moon, Neptune, ooze, Pen, quiet, rose, sun, talk,
 umbrella, voice, Walrus, xeon, Yam, zebra))
 rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
 parallelize at console:13

 scala rdd.keyBy(s = s(0).toUpper)
 res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at
 console:16

 scala res0.partitionBy(new RangePartitioner[Char, String](26,
 res0)).values
 res1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
 console:18

 scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx,
 s))).collect.foreach(println)

 The above example is clear for me to understand the meaning of the
 RangePartitioner, but to my surprise, I got the following result:

 *(0,apple)*
 *(0,Ball)*
 (1,cat)
 (2,dog)
 (3,Elephant)
 (4,fox)
 (5,gas)
 (6,horse)
 (7,index)
 (8,jet)
 (9,kitsch)
 (10,long)
 (11,moon)
 (12,Neptune)
 (13,ooze)
 (14,Pen)
 (15,quiet)
 (16,rose)
 (17,sun)
 (18,talk)
 (19,umbrella)
 (20,voice)
 (21,Walrus)
 (22,xeon)
 (23,Yam)
 (24,zebra)

 instead of a perfect range index from 0 to 25 in old email thread. Why is
 that? Is this a bug, or some new feature I don't understand?

 BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4
 binary release.

 Thanks

 Yong



Re: Shuffle write increases in spark 1.2

2015-02-15 Thread Aaron Davidson
I think Xuefeng Wu's suggestion is likely correct. This different is more
likely explained by the compression library changing versions than sort vs
hash shuffle (which should not affect output size significantly). Others
have reported that switching to lz4 fixed their issue.

We should document this if this is the case. I wonder if we're asking
Snappy to be super-low-overhead and as a result the new version does a
better job of it (less overhead, less compression).

On Sat, Feb 14, 2015 at 9:32 AM, Peng Cheng pc...@uow.edu.au wrote:

 I double check the 1.2 feature list and found out that the new sort-based
 shuffle manager has nothing to do with HashPartitioner :- Sorry for the
 misinformation.

 In another hand. This may explain increase in shuffle spill as a side
 effect
 of the new shuffle manager, let me revert spark.shuffle.manager to hash and
 see if it make things better (or worse, as the benchmark in
 https://issues.apache.org/jira/browse/SPARK-3280 indicates)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21657.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Shuffle read/write issue in spark 1.2

2015-02-06 Thread Aaron Davidson
Did the problem go away when you switched to lz4? There was a change from
the default compression codec fro 1.0 to 1.1, where we went from LZF to
Snappy. I don't think there was any such change from 1.1 to 1.2, though.

On Fri, Feb 6, 2015 at 12:17 AM, Praveen Garg praveen.g...@guavus.com
wrote:

  We tried changing the compression codec from snappy to lz4. It did
 improve the performance but we are still wondering why default options
 didn’t work as claimed.

   From: Raghavendra Pandey raghavendra.pan...@gmail.com
 Date: Friday, 6 February 2015 1:23 pm
 To: Praveen Garg praveen.g...@guavus.com
 Cc: user@spark.apache.org user@spark.apache.org
 Subject: Re: Shuffle read/write issue in spark 1.2

   Even I observed the same issue.

 On Fri, Feb 6, 2015 at 12:19 AM, Praveen Garg praveen.g...@guavus.com
 wrote:

  Hi,

  While moving from spark 1.1 to spark 1.2, we are facing an issue where
 Shuffle read/write has been increased significantly. We also tried running
 the job by rolling back to spark 1.1 configuration where we set
 spark.shuffle.manager to hash and spark.shuffle.blockTransferService to
 nio. It did improve the performance a bit but it was still much worse than
 spark 1.1. The scenario seems similar to the bug raised sometime back
 https://issues.apache.org/jira/browse/SPARK-5081.
 Has anyone come across any similar issue? Please tell us if any
 configuration change can help.

  Regards, Praveen





Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-04 Thread Aaron Davidson
The latter would be faster. With S3, you want to maximize number of
concurrent readers until you hit your network throughput limits.

On Wed, Feb 4, 2015 at 6:20 AM, Peter Rudenko petro.rude...@gmail.com
wrote:

  Hi if i have a 10GB file on s3 and set 10 partitions, would it be
 download whole file on master first and broadcast it or each worker would
 just read it's range from the file?

 Thanks,
 Peter

 On 2015-02-03 23:30, Sven Krasser wrote:

  Hey Joe,

 With the ephemeral HDFS, you get the instance store of your worker nodes.
 For m3.xlarge that will be two 40 GB SSDs local to each instance, which are
 very fast.

  For the persistent HDFS, you get whatever EBS volumes the launch script
 configured. EBS volumes are always network drives, so the usual limitations
 apply. To optimize throughput, you can use EBS volumes with provisioned
 IOPS and you can use EBS optimized instances. I don't have hard numbers at
 hand, but I'd expect this to be noticeably slower than using local SSDs.

 As far as only using S3 goes, it depends on your use case (i.e. what you
 plan on doing with the data while it is there). If you store it there in
 between running different applications, you can likely work around
 consistency issues.

 Also, if you use Amazon's EMRFS to access data in S3, you can use their
 new consistency feature (
 https://aws.amazon.com/blogs/aws/emr-consistent-file-system/).

 Hope this helps!
 -Sven


 On Tue, Feb 3, 2015 at 9:32 AM, Joe Wass jw...@crossref.org wrote:

 The data is coming from S3 in the first place, and the results will be
 uploaded back there. But even in the same availability zone, fetching 170
 GB (that's gzipped) is slow. From what I understand of the pipelines,
 multiple transforms on the same RDD might involve re-reading the input,
 which very quickly add up in comparison to having the data locally. Unless
 I persisted the data (which I am in fact doing) but that would involve
 storing approximately the same amount of data in HDFS, which wouldn't fit.

  Also, I understood that S3 was unsuitable for practical? See Why you
 cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
 though, that would make things a lot easier.

  [0] http://wiki.apache.org/hadoop/AmazonS3



 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net
 wrote:

 You could also just push the data to Amazon S3, which would un-link the
 size of the cluster needed to process the data from the size of the data.

 DR


 On 02/03/2015 11:43 AM, Joe Wass wrote:

 I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
 need
 to store the input in HDFS somehow.

 I currently have a cluster of 5 x m3.xlarge, each of which has 80GB
 disk.
 Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

 If I want to process 800 GB of data (assuming I can't split the jobs
 up),
 I'm guessing I need to get persistent-hdfs involved.

 1 - Does persistent-hdfs have noticeably different performance than
 ephemeral-hdfs?
 2 - If so, is there a recommended configuration (like storing input and
 output on persistent, but persisted RDDs on ephemeral?)

 This seems like a common use-case, so sorry if this has already been
 covered.

 Joe



  -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 http://sites.google.com/site/krasser/?utm_source=sig





Re: 2GB limit for partitions?

2015-02-03 Thread Aaron Davidson
To be clear, there is no distinction between partitions and blocks for RDD
caching (each RDD partition corresponds to 1 cache block). The distinction
is important for shuffling, where by definition N partitions are shuffled
into M partitions, creating N*M intermediate blocks. Each of these blocks
must also be smaller than 2GB, but due to their number, this is an atypical
scenario.

If you do

sc.parallelize(1 to 1e6.toInt, 1).map{i = new
Array[Byte](5e3.toInt)}.repartition(1000).count()

you should not see this error, as the 5GB initial partition was split into
1000 partitions of 5MB each, during a shuffle.

On the other hand,

sc.parallelize(1 to 1e6.toInt, 1).map{i = new
Array[Byte](5e3.toInt)}.repartition(1).count()

may have the same error as Imran showed for caching, and for the same
reason.

On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

 Michael,

 you are right, there is definitely some limit at 2GB.  Here is a trivial
 example to demonstrate it:

 import org.apache.spark.storage.StorageLevel
 val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
 Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
 d.count()

 It gives the same error you are observing.  I was under the same
 impression as Sean about the limits only being on blocks, not partitions --
 but clearly that isn't the case here.

 I don't know the whole story yet, but I just wanted to at least let you
 know you aren't crazy :)
 At the very least this suggests that you might need to make smaller
 partitions for now.

 Imran


 On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000 files it
 will go
  into and whose value is what I will want to stick into the file.
  2) Partition the data and use 

Re: Duplicate key when sorting BytesWritable with Kryo?

2015-01-30 Thread Aaron Davidson
Ah, this is in particular an issue due to sort-based shuffle (it was not
the case for hash-based shuffle, which would immediately serialize each
record rather than holding many in memory at once). The documentation
should be updated.

On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Hi Andrew,

 Here's a note from the doc for sequenceFile:

 * '''Note:''' Because Hadoop's RecordReader class re-uses the same
 Writable object for each
 * record, directly caching the returned RDD will create many
 references to the same object.
 * If you plan to directly cache Hadoop writable objects, you should
 first copy them using
 * a `map` function.

 This should probably say direct cachingly *or directly shuffling*.  To
 sort directly from a sequence file, the records need to be cloned first.

 -Sandy


 On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson 
 andrew.row...@thomsonreuters.com wrote:

 I've found a strange issue when trying to sort a lot of data in HDFS using
 spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class
 that derives from BytesWritable (the value is also a BytesWritable). I'm
 using a custom KryoSerializer to serialize the underlying byte array
 (basically write the length and the byte array).

 My spark job looks like this:

 spark.sequenceFile(inputPath, classOf[CustomKey],
 classOf[BytesWritable]).sortByKey().map(t =
 t._1).saveAsTextFile(outputPath)

 CustomKey extends BytesWritable, adds a toString method and some other
 helper methods that extract and convert parts of the underlying byte[].

 This should simply output a series of textfiles which contain the sorted
 list of keys. The problem is that under certain circumstances I get many
 duplicate keys. The number of records output is correct, but it appears
 that
 large chunks of the output are simply copies of the last record in that
 chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9].

 This appears to happen only above certain input data volumes, and it
 appears
 to be when shuffle spills. For a job where shuffle spill for memory and
 disk
 = 0B, the data is correct. If there is any spill, I see the duplicate
 behaviour. Oddly, the shuffle write is much smaller when there's a spill.
 E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
 whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle
 write.
 I'm guessing some sort of compression is happening on duplicate identical
 values?

 Oddly, I can fix this issue if I adjust my scala code to insert a map step
 before the call to sortByKey():

 .map(t = (new CustomKey(t._1),t._2))

 This constructor is just:

 public CustomKey(CustomKey left) { this.set(left); }

 Why does this work? I've no idea.

 The spark job is running in yarn-client mode with all the default
 configuration values set. Using the external shuffle service and disabling
 spill compression makes no difference.

 Is this a bug?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: performance of saveAsTextFile moving files from _temporary

2015-01-28 Thread Aaron Davidson
Upon completion of the 2 hour part of the run, the files did not exist in
the output directory? One thing that is done serially is deleting any
remaining files from _temporary, so perhaps there was a lot of data
remaining in _temporary but the committed data had already been moved.

I am, unfortunately, not aware of other issues that would cause this to be
so slow.

On Tue, Jan 27, 2015 at 6:54 PM, Josh Walton j...@openbookben.com wrote:

 I'm not sure how to confirm how the moving is happening, however, one of
 the jobs just completed that I was talking about with 9k files of 4mb each.
 Spark UI showed the job being complete after ~2 hours. The last four hours
 of the job was just moving the files from _temporary to their final
 destination. The tasks for the write were definitely shown as complete, no
 logging is happening on the master or workers. The last line of my java
 code logs, but the job sits there as the moving of files happens.

 On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 This renaming from _temporary to the final location is actually done by
 executors, in parallel, for saveAsTextFile. It should be performed by each
 task individually before it returns.

 I have seen an issue similar to what you mention dealing with Hive code
 which did the renaming serially on the driver, which is very slow for S3
 (and possibly Google Storage as well), as it actually copies the data
 rather than doing a metadata-only operation during rename. However, this
 should not be an issue in this case.

 Could you confirm how the moving is happening -- i.e., on the executors
 or the driver?

 On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote:

 We are running spark in Google Compute Engine using their One-Click
 Deploy.
 By doing so, we get their Google Cloud Storage connector for hadoop for
 free
 meaning we can specify gs:// paths for input and output.

 We have jobs that take a couple of hours, end up with ~9k partitions
 which
 means 9k output files. After the job is complete it then moves the
 output
 files from our $output_path/_temporary to $output_path. That process can
 take longer than the job itself depending on the circumstances. The job I
 mentioned previously outputs ~4mb files, and so far has copied 1/3 of the
 files in 1.5 hours from _temporary to the final destination.

 Is there a solution to this besides reducing the number of partitions?
 Anyone else run into similar issues elsewhere? I don't remember this
 being
 an issue with Map Reduce jobs and hadoop, however, I probably wasn't
 tracking the transfer of the output files like I am with Spark.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: performance of saveAsTextFile moving files from _temporary

2015-01-27 Thread Aaron Davidson
This renaming from _temporary to the final location is actually done by
executors, in parallel, for saveAsTextFile. It should be performed by each
task individually before it returns.

I have seen an issue similar to what you mention dealing with Hive code
which did the renaming serially on the driver, which is very slow for S3
(and possibly Google Storage as well), as it actually copies the data
rather than doing a metadata-only operation during rename. However, this
should not be an issue in this case.

Could you confirm how the moving is happening -- i.e., on the executors or
the driver?

On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote:

 We are running spark in Google Compute Engine using their One-Click Deploy.
 By doing so, we get their Google Cloud Storage connector for hadoop for
 free
 meaning we can specify gs:// paths for input and output.

 We have jobs that take a couple of hours, end up with ~9k partitions which
 means 9k output files. After the job is complete it then moves the output
 files from our $output_path/_temporary to $output_path. That process can
 take longer than the job itself depending on the circumstances. The job I
 mentioned previously outputs ~4mb files, and so far has copied 1/3 of the
 files in 1.5 hours from _temporary to the final destination.

 Is there a solution to this besides reducing the number of partitions?
 Anyone else run into similar issues elsewhere? I don't remember this being
 an issue with Map Reduce jobs and hadoop, however, I probably wasn't
 tracking the transfer of the output files like I am with Spark.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Lost task - connection closed

2015-01-26 Thread Aaron Davidson
It looks like something weird is going on with your object serialization,
perhaps a funny form of self-reference which is not detected by
ObjectOutputStream's typical loop avoidance. That, or you have some data
structure like a linked list with a parent pointer and you have many
thousand elements.

Assuming the stack trace is coming from an executor, it is probably a
problem with the objects you're sending back as results, so I would
carefully examine these and maybe try serializing some using
ObjectOutputStream manually.

If your program looks like
foo.map { row = doComplexOperation(row) }.take(10)

you can also try changing it to
foo.map { row = doComplexOperation(row); 1 }.take(10)

to avoid serializing the result of that complex operation, which should
help narrow down where exactly the problematic objects are coming from.

On Mon, Jan 26, 2015 at 8:31 AM, octavian.ganea octavian.ga...@inf.ethz.ch
wrote:

 Here is the first error I get at the executors:

 15/01/26 17:27:04 ERROR ExecutorUncaughtExceptionHandler: Uncaught
 exception
 in thread Thread[handle-message-executor-16,5,main]
 java.lang.StackOverflowError
 at

 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
 at

 java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1840)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1533)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

 If you have any pointers for me on how to debug this, that would be very
 useful. I tried running with both spark 1.2.0 and 1.1.1, getting the same
 error.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Lost-task-connection-closed-tp21361p21371.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 

Re: Lost task - connection closed

2015-01-25 Thread Aaron Davidson
Please take a look at the executor logs (on both sides of the IOException)
to see if there are other exceptions (e.g., OOM) which precede this one.
Generally, the connections should not fail spontaneously.

On Sun, Jan 25, 2015 at 10:35 PM, octavian.ganea octavian.ga...@inf.ethz.ch
 wrote:

 Hi,

 I am running a program that executes map-reduce jobs in a loop. The first
 time the loop runs, everything is ok. After that, it starts giving the
 following error, first it gives it for one task, then for more tasks and
 eventually the entire program fails:

 15/01/26 01:41:25 WARN TaskSetManager: Lost task 10.0 in stage 15.0 (TID
 1063, hostnameXX): java.io.IOException: Connection from
 hostnameXX/172.31.109.50:50808 closed
 at

 org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:98)
 at

 org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:81)
 at

 io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
 at

 io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
 at

 io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
 at

 io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
 at

 io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
 at

 io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
 at

 io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
 at

 io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
 at

 io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738)
 at

 io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606)
 at

 io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
 at

 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
 at java.lang.Thread.run(Thread.java:745)

 Can someone help me with debugging this ?

 Thank you!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Lost-task-connection-closed-tp21361.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark 1.2 – How to change Default (Random) port ….

2015-01-25 Thread Aaron Davidson
This was a regression caused by Netty Block Transfer Service. The fix for
this just barely missed the 1.2 release, and you can see the associated
JIRA here: https://issues.apache.org/jira/browse/SPARK-4837

Current master has the fix, and the Spark 1.2.1 release will have it
included. If you don't want to rebuild from master or wait, then you can
turn it off by setting spark.shuffle.blockTransferService to nio.

On Sun, Jan 25, 2015 at 6:28 PM, Shailesh Birari sbirar...@gmail.com
wrote:

 Can anyone please let me know ?
 I don't want to open all ports on n/w. So, am interested in the property by
 which this new port I can configure.

   Shailesh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-How-to-change-Default-Random-port-tp21306p21360.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Aaron Davidson
Spark's network-common package depends on guava as a provided dependency
in order to avoid conflicting with other libraries (e.g., Hadoop) that
depend on specific versions. com/google/common/base/Preconditions has been
present in Guava since version 2, so this is likely a dependency not
found rather than wrong version of dependency issue.

To resolve this, please depend on some version of Guava (14.0.1 is
guaranteed to work, as should any other version from the past few years).

On Tue, Jan 20, 2015 at 6:16 PM, Shailesh Birari sbirar...@gmail.com
wrote:

 Hi Frank,

 Its a normal eclipse project where I added Scala and Spark libraries as
 user libraries.
 Though, I am not attaching any hadoop libraries, in my application code I
 have following line.

   System.setProperty(hadoop.home.dir, C:\\SB\\HadoopWin)

 This Hadoop home dir contains winutils.exe only.

 Don't think that its an issue.

 Please suggest.

 Thanks,
   Shailesh


 On Wed, Jan 21, 2015 at 2:19 PM, Frank Austin Nothaft 
 fnoth...@berkeley.edu wrote:

 Shailesh,

 To add, are you packaging Hadoop in your app? Hadoop will pull in Guava.
 Not sure if you are using Maven (or what) to build, but if you can pull up
 your builds dependency tree, you will likely find com.google.guava being
 brought in by one of your dependencies.

 Regards,

 Frank Austin Nothaft
 fnoth...@berkeley.edu
 fnoth...@eecs.berkeley.edu
 202-340-0466

 On Jan 20, 2015, at 5:13 PM, Shailesh Birari sbirar...@gmail.com wrote:

 Hello,

 I double checked the libraries. I am linking only with Spark 1.2.
 Along with Spark 1.2 jars I have Scala 2.10 jars and JRE 7 jars linked
 and nothing else.

 Thanks,
Shailesh

 On Wed, Jan 21, 2015 at 12:58 PM, Sean Owen so...@cloudera.com wrote:

 Guava is shaded in Spark 1.2+. It looks like you are mixing versions
 of Spark then, with some that still refer to unshaded Guava. Make sure
 you are not packaging Spark with your app and that you don't have
 other versions lying around.

 On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari sbirar...@gmail.com
 wrote:
  Hello,
 
  I recently upgraded my setup from Spark 1.1 to Spark 1.2.
  My existing applications are working fine on ubuntu cluster.
  But, when I try to execute Spark MLlib application from Eclipse
 (Windows
  node) it gives java.lang.NoClassDefFoundError:
  com/google/common/base/Preconditions exception.
 
  Note,
 1. With Spark 1.1 this was working fine.
 2. The Spark 1.2 jar files are linked in Eclipse project.
 3. Checked the jar -tf output and found the above
 com.google.common.base
  is not present.
 
 
 -Exception
  log:
 
  Exception in thread main java.lang.NoClassDefFoundError:
  com/google/common/base/Preconditions
  at
 
 org.apache.spark.network.client.TransportClientFactory.init(TransportClientFactory.java:94)
  at
 
 org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77)
  at
 
 org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62)
  at
 org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
  at org.apache.spark.SparkContext.init(SparkContext.scala:340)
  at
 
 org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74)
  at
 org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala)
  Caused by: java.lang.ClassNotFoundException:
  com.google.common.base.Preconditions
  at java.net.URLClassLoader$1.run(Unknown Source)
  at java.net.URLClassLoader$1.run(Unknown Source)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(Unknown Source)
  at java.lang.ClassLoader.loadClass(Unknown Source)
  at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
  at java.lang.ClassLoader.loadClass(Unknown Source)
  ... 7 more
 
 
 -
 
  jar -tf output:
 
 
  consb2@CONSB2A
  /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib
  $ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions
  org/spark-project/guava/common/base/Preconditions.class
  org/spark-project/guava/common/math/MathPreconditions.class
  com/clearspring/analytics/util/Preconditions.class
  parquet/Preconditions.class
  com/google/inject/internal/util/$Preconditions.class
 
 
 ---
 
  Please help me in resolving this.
 
  Thanks,
Shailesh
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-com-google-common-base-Preconditions-java-lang-NoClassDefFoundErro-tp21271.html
  Sent from the Apache 

Re: Serializability: for vs. while loops

2015-01-15 Thread Aaron Davidson
Scala for-loops are implemented as closures using anonymous inner classes
which are instantiated once and invoked many times. This means, though,
that the code inside the loop is actually sitting inside a class, which
confuses Spark's Closure Cleaner, whose job is to remove unused references
from closures to make otherwise-unserializable objects serializable.

My understanding is, in particular, that the closure cleaner will null out
unused fields in the closure, but cannot go past the first level of depth
(i.e., it will not follow field references and null out *their *unused, and
possibly unserializable, references), because this could end up mutating
state outside of the closure itself. Thus, the extra level of depth of the
closure that was introduced by the anonymous class (where presumably the
outer this pointer is considered used by the closure cleaner) is
sufficient to make it unserializable.

While loops, on the other hand, involve none of this trickery, and everyone
is happy.

On Wed, Jan 14, 2015 at 11:37 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 sorry, I don't like questions about serializability myself, but still...

 Can anyone give me a hint why

   for (i - 0 to (maxId - 1)) {  ...  }

 throws a NotSerializableException in the loop body while

   var i = 0
   while (i  maxId) {
 // same code as in the for loop
 i += 1
   }

 works fine? I guess there is something fundamentally different in the way
 Scala realizes for loops?

 Thanks
 Tobias



Re: use netty shuffle for network cause high gc time

2015-01-13 Thread Aaron Davidson
What version are you running? I think spark.shuffle.use.netty was a valid
option only in Spark 1.1, where the Netty stuff was strictly experimental.
Spark 1.2 contains an officially supported and much more thoroughly tested
version under the property spark.shuffle.blockTransferService, which is
set to netty by default.

On Tue, Jan 13, 2015 at 9:26 PM, lihu lihu...@gmail.com wrote:

 Hi,
  I just test groupByKey method on a 100GB data, the cluster is 20
 machine, each with 125GB RAM.

 At first I set  conf.set(spark.shuffle.use.netty, false) and run
 the experiment, and then I set conf.set(spark.shuffle.use.netty, true)
 again to re-run the experiment, but at the latter case, the GC time is much
 higher。


  I thought the latter one should be better, but it is not. So when should
 we use netty for network shuffle fetching?





Re: FileNotFoundException in appcache shuffle files

2015-01-10 Thread Aaron Davidson
As Jerry said, this is not related to shuffle file consolidation.

The unique thing about this problem is that it's failing to find a file
while trying to _write_ to it, in append mode. The simplest explanation for
this would be that the file is deleted in between some check for existence
and opening the file for append.

The deletion of such files as a race condition with writing them (on the
map side) would be most easily explained by a JVM shutdown event, for
instance caused by a fatal error such as OutOfMemoryError. So, as Ilya
said, please look for another exception possibly preceding this one.

On Sat, Jan 10, 2015 at 12:16 PM, lucio raimondo luxmea...@hotmail.com
wrote:

 Hey,

 I am having a similar issue, did you manage to find a solution yet?
 Please
 check my post below for reference:


 http://apache-spark-user-list.1001560.n3.nabble.com/IOError-Errno-2-No-such-file-or-directory-tmp-spark-9e23f17e-2e23-4c26-9621-3cb4d8b832da-tmp3i3xno-td21076.html

 Thank you,
 Lucio



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p21077.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId

2015-01-08 Thread Aaron Davidson
Do note that this problem may be fixed in Spark 1.2, as we changed the
default transfer service to use a Netty-based one rather than the
ConnectionManager.

On Thu, Jan 8, 2015 at 7:05 AM, Spidy yoni...@gmail.com wrote:

 Hi,

 Can you please explain which settings did you changed?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-ConnectionManager-Corresponding-SendingConnection-to-ConnectionManagerId-tp17050p21035.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: RDDs being cleaned too fast

2014-12-10 Thread Aaron Davidson
The ContextCleaner uncaches RDDs that have gone out of scope on the driver.
So it's possible that the given RDD is no longer reachable in your
program's control flow, or else it'd be a bug in the ContextCleaner.

On Wed, Dec 10, 2014 at 5:34 PM, ankits ankitso...@gmail.com wrote:

 I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too
 fast.
 How can i inspect the size of RDD in memory and get more information about
 why it was cleaned up. There should be more than enough memory available on
 the cluster to store them, and by default, the spark.cleaner.ttl is
 infinite, so I want more information about why this is happening and how to
 prevent it.

 Spark just logs this when removing RDDs:

 [2014-12-11 01:19:34,006] INFO  spark.storage.BlockManager [] [] - Removing
 RDD 33
 [2014-12-11 01:19:34,010] INFO  pache.spark.ContextCleaner []
 [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
 [2014-12-11 01:19:34,012] INFO  spark.storage.BlockManager [] [] - Removing
 RDD 33
 [2014-12-11 01:19:34,016] INFO  pache.spark.ContextCleaner []
 [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Running two different Spark jobs vs multi-threading RDDs

2014-12-06 Thread Aaron Davidson
You can actually submit multiple jobs to a single SparkContext in different
threads. In the case you mentioned with 2 stages having a common parent,
both will wait for the parent stage to complete and then the two will
execute in parallel, sharing the cluster resources.

Solutions that submit multiple applications are also reasonable, but then
you have to manage the job dependencies yourself.

On Sat, Dec 6, 2014 at 8:41 AM, Corey Nolet cjno...@gmail.com wrote:

 Reading the documentation a little more closely, I'm using the wrong
 terminology. I'm using stages to refer to what spark is calling a job. I
 guess application (more than one spark context) is what I'm asking about
 On Dec 5, 2014 5:19 PM, Corey Nolet cjno...@gmail.com wrote:

 I've read in the documentation that RDDs can be run concurrently when
 submitted in separate threads. I'm curious how the scheduler would handle
 propagating these down to the tasks.

 I have 3 RDDs:
 - one RDD which loads some initial data, transforms it and caches it
 - two RDDs which use the cached RDD to provide reports

 I'm trying to figure out how the resources will be scheduled to perform
 these stages if I were to concurrently run the two RDDs that depend on the
 first RDD. Would the two RDDs run sequentially? Will they both run @ the
 same time and be smart about how they are caching?

 Would this be a time when I'd want to use Tachyon instead and run this as
 2 separate physical jobs: one to place the shared data in the RAMDISK and
 one to run the two dependent RDDs concurrently? Or would it even be best in
 that case to run 3 completely separate jobs?

 We're planning on using YARN so there's 2 levels of scheduling going on.
 We're trying to figure out the best way to utilize the resources so that we
 are fully saturating the system and making sure there's constantly work
 being done rather than anything spinning gears waiting on upstream
 processing to occur (in mapreduce, we'd just submit a ton of jobs and have
 them wait in line).




Re: Announcing Spark 1.1.1!

2014-12-03 Thread Aaron Davidson
Because this was a maintenance release, we should not have introduced any
binary backwards or forwards incompatibilities. Therefore, applications
that were written and compiled against 1.1.0 should still work against a
1.1.1 cluster, and vice versa.

On Wed, Dec 3, 2014 at 1:30 PM, Andrew Or and...@databricks.com wrote:

 By the Spark server do you mean the standalone Master? It is best if they
 are upgraded together because there have been changes to the Master in
 1.1.1. Although it might just work, it's highly recommended to restart
 your cluster manager too.

 2014-12-03 13:19 GMT-08:00 Romi Kuntsman r...@totango.com:

 About version compatibility and upgrade path -  can the Java application
 dependencies and the Spark server be upgraded separately (i.e. will 1.1.0
 library work with 1.1.1 server, and vice versa), or do they need to be
 upgraded together?

 Thanks!

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com

 On Tue, Dec 2, 2014 at 11:36 PM, Andrew Or and...@databricks.com wrote:

 I am happy to announce the availability of Spark 1.1.1! This is a
 maintenance release with many bug fixes, most of which are concentrated in
 the core. This list includes various fixes to sort-based shuffle, memory
 leak, and spilling issues. Contributions from this release came from 55
 developers.

 Visit the release notes [1] to read about the new features, or
 download [2] the release today.

 [1] http://spark.apache.org/releases/spark-release-1-1-1.html
 [2] http://spark.apache.org/downloads.html

 Please e-mail me directly for any typo's in the release notes or name
 listing.

 Thanks for everyone who contributed, and congratulations!
 -Andrew






Re: S3NativeFileSystem inefficient implementation when calling sc.textFile

2014-11-30 Thread Aaron Davidson
Note that it does not appear that s3a solves the original problems in this
thread, which are on the Spark side or due to the fact that metadata
listing in S3 is slow simply due to going over the network.

On Sun, Nov 30, 2014 at 10:07 AM, David Blewett da...@dawninglight.net
wrote:

 You might be interested in the new s3a filesystem in Hadoop 2.6.0 [1].

 1.
 https://issues.apache.org/jira/plugins/servlet/mobile#issue/HADOOP-10400
 On Nov 26, 2014 12:24 PM, Aaron Davidson ilike...@gmail.com wrote:

 Spark has a known problem where it will do a pass of metadata on a large
 number of small files serially, in order to find the partition information
 prior to starting the job. This will probably not be repaired by switching
 the FS impl.

 However, you can change the FS being used like so (prior to the first
 usage):
 sc.hadoopConfiguration.set(fs.s3n.impl,
 org.apache.hadoop.fs.s3native.NativeS3FileSystem)

 On Wed, Nov 26, 2014 at 1:47 AM, Tomer Benyamini tomer@gmail.com
 wrote:

 Thanks Lalit; Setting the access + secret keys in the configuration
 works even when calling sc.textFile. Is there a way to select which hadoop
 s3 native filesystem implementation would be used at runtime using the
 hadoop configuration?

 Thanks,
 Tomer

 On Wed, Nov 26, 2014 at 11:08 AM, lalit1303 la...@sigmoidanalytics.com
 wrote:


 you can try creating hadoop Configuration and set s3 configuration i.e.
 access keys etc.
 Now, for reading files from s3 use newAPIHadoopFile and pass the config
 object here along with key, value classes.





 -
 Lalit Yadav
 la...@sigmoidanalytics.com
 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/S3NativeFileSystem-inefficient-implementation-when-calling-sc-textFile-tp19841p19845.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: S3NativeFileSystem inefficient implementation when calling sc.textFile

2014-11-26 Thread Aaron Davidson
Spark has a known problem where it will do a pass of metadata on a large
number of small files serially, in order to find the partition information
prior to starting the job. This will probably not be repaired by switching
the FS impl.

However, you can change the FS being used like so (prior to the first
usage):
sc.hadoopConfiguration.set(fs.s3n.impl,
org.apache.hadoop.fs.s3native.NativeS3FileSystem)

On Wed, Nov 26, 2014 at 1:47 AM, Tomer Benyamini tomer@gmail.com
wrote:

 Thanks Lalit; Setting the access + secret keys in the configuration works
 even when calling sc.textFile. Is there a way to select which hadoop s3
 native filesystem implementation would be used at runtime using the hadoop
 configuration?

 Thanks,
 Tomer

 On Wed, Nov 26, 2014 at 11:08 AM, lalit1303 la...@sigmoidanalytics.com
 wrote:


 you can try creating hadoop Configuration and set s3 configuration i.e.
 access keys etc.
 Now, for reading files from s3 use newAPIHadoopFile and pass the config
 object here along with key, value classes.





 -
 Lalit Yadav
 la...@sigmoidanalytics.com
 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/S3NativeFileSystem-inefficient-implementation-when-calling-sc-textFile-tp19841p19845.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Bug in Accumulators...

2014-11-23 Thread Aaron Davidson
As Mohit said, making Main extend Serializable should fix this example. In
general, it's not a bad idea to mark the fields you don't want to serialize
(e.g., sc and conf in this case) as @transient as well, though this is not
the issue in this case.

Note that this problem would not have arisen in your very specific example
if you used a while loop instead of a for-each loop, but that's really more
of a happy coincidence than something you should rely on, as nested lambdas
are virtually unavoidable in Scala.

On Sat, Nov 22, 2014 at 5:16 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 perhaps the closure ends up including the main object which is not
 defined as serializable...try making it a case object or object main
 extends Serializable.


 On Sat, Nov 22, 2014 at 4:16 PM, lordjoe lordjoe2...@gmail.com wrote:

 I posted several examples in java at http://lordjoesoftware.blogspot.com/

 Generally code like this works and I show how to accumulate more complex
 values.

 // Make two accumulators using Statistics
  final AccumulatorInteger totalLetters= ctx.accumulator(0L,
 ttl);
  JavaRDDstring lines = ...

 JavaRDDstring words = lines.flatMap(new FlatMapFunctionString,
 String() {
 @Override
 public Iterablestring call(final String s) throws Exception
 {
 // Handle accumulator here
 totalLetters.add(s.length()); // count all letters
 
  });
 
  Long numberCalls = totalCounts.value();

 I believe the mistake is to pass the accumulator to the function rather
 than
 letting the function find the accumulator - I do this in this case by
 using
 a final local variable



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Given multiple .filter()'s, is there a way to set the order?

2014-11-14 Thread Aaron Davidson
In the situation you show, Spark will pipeline each filter together, and
will apply each filter one at a time to each row, effectively constructing
an  statement. You would only see a performance difference if the
filter code itself is somewhat expensive, then you would want to only
execute it on a smaller set of rows. Otherwise, the runtime difference
between a == b  b == c  c ==d is minimal when compared to a == b  b
== c  c == d, the latter being sort of the worst-case scenario as it
would always run all filters (though as I said, Spark acts like the former).

Spark does not reorder the filters automatically. It uses the explicit
ordering you provide.

On Fri, Nov 14, 2014 at 10:20 AM, YaoPau jonrgr...@gmail.com wrote:

 I have an RDD x of millions of STRINGs, each of which I want to pass
 through a set of filters.  My filtering code looks like this:

 x.filter(filter#1, which will filter out 40% of data).
filter(filter#2, which will filter out 20% of data).
filter(filter#3, which will filter out 2% of data).
filter(filter#4, which will filter out 1% of data)

 There is no ordering requirement (filter #2 does not depend on filter #1,
 etc), but the filters are drastically different in the % of rows they
 should
 eliminate.  What I'd like is an ordering similar to a || statement, where
 if it fails on filter#1 the row automatically gets filtered out before the
 other three filters run.

 But when I play around with the ordering of the filters, the runtime
 doesn't
 seem to change.  Is Spark somehow intelligently guessing how effective each
 filter will be and ordering it correctly regardless of how I order them?
 If
 not, is there I way I can set the filter order?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Given-multiple-filter-s-is-there-a-way-to-set-the-order-tp18957.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: data locality, task distribution

2014-11-13 Thread Aaron Davidson
You mentioned that the 3.1 min run was the one that did the actual caching,
so did that run before any data was cached, or after?

I would recommend checking the Storage tab of the UI, and clicking on the
RDD, to see both how full the executors' storage memory is (which may be
significantly less than the instance's memory). When a task completes over
data that should be cached, it will try to cache it, so it's pretty weird
that you're seeing 100% cache with memory to spare. It's possible that
some partitions are significantly larger than others, which may cause us to
not attempt to cache it (defined by spark.storage.unrollFraction).

You can also try increasing the spark.locality.wait flag to ensure that
Spark will wait longer for tasks to complete before running them
non-locally. One possible situation is that a node hits GC for a few
seconds, a task is scheduled non-locally, and then attempting to read from
the other executors' cache is much more expensive than computing the data
initially. Increasing the locality wait beyond the GC time would avoid this.

On Thu, Nov 13, 2014 at 9:08 AM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 I am seeing skewed execution times.  As far as I can tell, they are
 attributable to differences in data locality - tasks with locality
 PROCESS_LOCAL run fast, NODE_LOCAL, slower, and ANY, slowest.

 This seems entirely as it should be - the question is, why the different
 locality levels?

 I am seeing skewed caching, as I mentioned before - in the case I
 isolated, with 4 nodes, they were distributed at about 42%, 31%, 20%, and
 6%.  However, the total amount was significantly less than the memory of
 any single node, so I don't think they could have overpopulated their
 cache.  I am occasionally seeing task failures, but the re-execute
 themselves, and work fine the next time.  Yet I'm still seeing incomplete
 caching (from 65% cached up to 100%, depending on the run).

 I shouldn't have much variance in task time - this is simply a foreach
 over the data, adding to an accumulator, and the data is completely
 randomly distributed, so should be pretty even overall.

 I am seeing GC regressions occasionally - they slow a request from about 2
 seconds to about 5 seconds.  They 8 minute slowdown seems to be solely
 attributable to the data locality issue, as far as I can tell.  There was
 some further confusion though in the times I mentioned - the list I gave
 (3.1 min, 2 seconds, ... 8 min) were not different runs with different
 cache %s, they were iterations within a single run with 100% caching.

-Nathan



 On Thu, Nov 13, 2014 at 1:45 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 Spark's scheduling is pretty simple: it will allocate tasks to open cores
 on executors, preferring ones where the data is local. It even performs
 delay scheduling, which means waiting a bit to see if an executor where
 the data resides locally becomes available.

 Are yours tasks seeing very skewed execution times? If some tasks are
 taking a very long time and using all the resources on a node, perhaps the
 other nodes are quickly finishing many tasks, and actually overpopulating
 their caches. If a particular machine were not overpopulating its cache,
 and there are no failures, then you should see 100% cached after the first
 run.

 It's also strange that running totally uncached takes 3.1 minutes, but
 running 80-90% cached may take 8 minutes. Does your workload produce
 nondeterministic variance in task times? Was it a single straggler, or many
 tasks, that was keeping the job from finishing? It's not too uncommon to
 see occasional performance regressions while caching due to GC, though 2
 seconds to 8 minutes is a bit extreme.

 On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:

 Sorry, I think I was not clear in what I meant.
 I didn't mean it went down within a run, with the same instance.

 I meant I'd run the whole app, and one time, it would cache 100%, and
 the next run, it might cache only 83%

 Within a run, it doesn't change.

 On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 The fact that the caching percentage went down is highly suspicious. It
 should generally not decrease unless other cached data took its place, or
 if unless executors were dying. Do you know if either of these were the
 case?

 On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:

 Can anyone point me to a good primer on how spark decides where to
 send what task, how it distributes them, and how it determines data
 locality?

 I'm trying a pretty simple task - it's doing a foreach over cached
 data, accumulating some (relatively complex) values.

 So I see several inconsistencies I don't understand:

 (1) If I run it a couple times, as separate applications (i.e.,
 reloading, recaching, etc), I will get different %'s cached each time.
 I've got about 5x as much memory

Re: data locality, task distribution

2014-11-12 Thread Aaron Davidson
The fact that the caching percentage went down is highly suspicious. It
should generally not decrease unless other cached data took its place, or
if unless executors were dying. Do you know if either of these were the
case?

On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 Can anyone point me to a good primer on how spark decides where to send
 what task, how it distributes them, and how it determines data locality?

 I'm trying a pretty simple task - it's doing a foreach over cached data,
 accumulating some (relatively complex) values.

 So I see several inconsistencies I don't understand:

 (1) If I run it a couple times, as separate applications (i.e., reloading,
 recaching, etc), I will get different %'s cached each time.  I've got about
 5x as much memory as I need overall, so it isn't running out.  But one
 time, 100% of the data will be cached; the next, 83%, the next, 92%, etc.

 (2) Also, the data is very unevenly distributed. I've got 400 partitions,
 and 4 workers (with, I believe, 3x replication), and on my last run, my
 distribution was 165/139/25/71.  Is there any way to get spark to
 distribute the tasks more evenly?

 (3) If I run the problem several times in the same execution (to take
 advantage of caching etc.), I get very inconsistent results.  My latest
 try, I get:

- 1st run: 3.1 min
- 2nd run: 2 seconds
- 3rd run: 8 minutes
- 4th run: 2 seconds
- 5th run: 2 seconds
- 6th run: 6.9 minutes
- 7th run: 2 seconds
- 8th run: 2 seconds
- 9th run: 3.9 minuts
- 10th run: 8 seconds

 I understand the difference for the first run; it was caching that time.
 Later times, when it manages to work in 2 seconds, it's because all the
 tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the
 tasks end up with locality level ANY.  Why would that change when running
 the exact same task twice in a row on cached data?

 Any help or pointers that I could get would be much appreciated.


 Thanks,

  -Nathan



 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com



Re: data locality, task distribution

2014-11-12 Thread Aaron Davidson
Spark's scheduling is pretty simple: it will allocate tasks to open cores
on executors, preferring ones where the data is local. It even performs
delay scheduling, which means waiting a bit to see if an executor where
the data resides locally becomes available.

Are yours tasks seeing very skewed execution times? If some tasks are
taking a very long time and using all the resources on a node, perhaps the
other nodes are quickly finishing many tasks, and actually overpopulating
their caches. If a particular machine were not overpopulating its cache,
and there are no failures, then you should see 100% cached after the first
run.

It's also strange that running totally uncached takes 3.1 minutes, but
running 80-90% cached may take 8 minutes. Does your workload produce
nondeterministic variance in task times? Was it a single straggler, or many
tasks, that was keeping the job from finishing? It's not too uncommon to
see occasional performance regressions while caching due to GC, though 2
seconds to 8 minutes is a bit extreme.

On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 Sorry, I think I was not clear in what I meant.
 I didn't mean it went down within a run, with the same instance.

 I meant I'd run the whole app, and one time, it would cache 100%, and the
 next run, it might cache only 83%

 Within a run, it doesn't change.

 On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 The fact that the caching percentage went down is highly suspicious. It
 should generally not decrease unless other cached data took its place, or
 if unless executors were dying. Do you know if either of these were the
 case?

 On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:

 Can anyone point me to a good primer on how spark decides where to send
 what task, how it distributes them, and how it determines data locality?

 I'm trying a pretty simple task - it's doing a foreach over cached data,
 accumulating some (relatively complex) values.

 So I see several inconsistencies I don't understand:

 (1) If I run it a couple times, as separate applications (i.e.,
 reloading, recaching, etc), I will get different %'s cached each time.
 I've got about 5x as much memory as I need overall, so it isn't running
 out.  But one time, 100% of the data will be cached; the next, 83%, the
 next, 92%, etc.

 (2) Also, the data is very unevenly distributed. I've got 400
 partitions, and 4 workers (with, I believe, 3x replication), and on my last
 run, my distribution was 165/139/25/71.  Is there any way to get spark to
 distribute the tasks more evenly?

 (3) If I run the problem several times in the same execution (to take
 advantage of caching etc.), I get very inconsistent results.  My latest
 try, I get:

- 1st run: 3.1 min
- 2nd run: 2 seconds
- 3rd run: 8 minutes
- 4th run: 2 seconds
- 5th run: 2 seconds
- 6th run: 6.9 minutes
- 7th run: 2 seconds
- 8th run: 2 seconds
- 9th run: 3.9 minuts
- 10th run: 8 seconds

 I understand the difference for the first run; it was caching that
 time.  Later times, when it manages to work in 2 seconds, it's because all
 the tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the
 tasks end up with locality level ANY.  Why would that change when running
 the exact same task twice in a row on cached data?

 Any help or pointers that I could get would be much appreciated.


 Thanks,

  -Nathan



 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com





 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com



Re: Bug in Accumulators...

2014-11-07 Thread Aaron Davidson
This may be due in part to Scala allocating an anonymous inner class in
order to execute the for loop. I would expect if you change it to a while
loop like

var i = 0
while (i  10) {
  sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
  i += 1
}

then the problem may go away. I am not super familiar with the closure
cleaner, but I believe that we cannot prune beyond 1 layer of references,
so the extra class of nesting may be screwing something up. If this is the
case, then I would also expect replacing the accumulator with any other
reference to the enclosing scope (such as a broadcast variable) would have
the same result.

On Fri, Nov 7, 2014 at 12:03 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you provide all pieces of codes which can reproduce the bug? Here is
 my test code:

 import org.apache.spark._
 import org.apache.spark.SparkContext._

 object SimpleApp {

   def main(args: Array[String]) {
 val conf = new SparkConf().setAppName(SimpleApp)
 val sc = new SparkContext(conf)

 val accum = sc.accumulator(0)
 for (i - 1 to 10) {
   sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
 }
 sc.stop()
   }
 }

 It works fine both in client and cluster. Since this is a serialization
 bug, the outer class does matter. Could you provide it? Is there
 a SparkContext field in the outer class?

 Best Regards,
 Shixiong Zhu

 2014-10-28 0:28 GMT+08:00 octavian.ganea octavian.ga...@inf.ethz.ch:

 I am also using spark 1.1.0 and I ran it on a cluster of nodes (it works
 if I
 run it in local mode! )

 If I put the accumulator inside the for loop, everything will work fine. I
 guess the bug is that an accumulator can be applied to JUST one RDD.

 Still another undocumented 'feature' of Spark that no one from the people
 who maintain Spark is willing to solve or at least to tell us about ...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17372.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark speed performance

2014-11-01 Thread Aaron Davidson
coalesce() is a streaming operation if used without the second parameter,
it does not put all the data in RAM. If used with the second parameter
(shuffle = true), then it performs a shuffle, but still does not put all
the data in RAM.

On Sat, Nov 1, 2014 at 12:09 PM, jan.zi...@centrum.cz wrote:

 Now I am getting to problems using:

 distData = sc.textFile(sys.argv[2]).coalesce(10)



 The problem is that it seems that Spark is trying to put all the data to
 RAM first and then perform coalesce. Do you know if there is something
 that would do coalesce on fly with for example fixed size of the partition?
 Do you think that something like this is possible? Unfortunately I am not
 able to find anything like this in the Spark documentation.

 Thank you in advance for any advices or suggestions.

 Best regards,
 Jan

 __


 Thank you very much lot of very small json files was exactly the speed
 performance problem, using coalesce makes my Spark program to run on single
 node only twice slower (even with starting Spark) than single node Python
 program, which is acceptable.

 Jan
 __

 Because the overhead between JVM and Python, single task will be
 slower than your local Python scripts, but it's very easy to scale to
 many CPUs.

 Even one CPUs, it's not common that PySpark was 100 times slower. You
 have many small files, each file will be processed by a task, which
 will have about 100ms overhead (scheduled and executed), but the small
 file can be processed in your single thread Python script in less than
 1ms.

 You could pack your json files into larger ones, or you could try to
 merge the small tasks into larger one by coalesce(N), such as:

 distData = sc.textFile(sys.argv[2]).coalesce(10)  # which will have 10
 partitons (tasks)

 Davies

 On Sat, Oct 18, 2014 at 12:07 PM,  jan.zi...@centrum.cz wrote:
  Hi,
 
  I have program that I have for single computer (in Python) exection and
 also
  implemented the same for Spark. This program basically only reads .json
 from
  which it takes one field and saves it back. Using Spark my program runs
  aproximately 100 times slower on 1 master and 1 slave. So I would like to
  ask where possibly might be the problem?
 
  My Spark program looks like:
 
 
 
  sc = SparkContext(appName=Json data preprocessor)
 
  distData = sc.textFile(sys.argv[2])
 
  json_extractor = JsonExtractor(sys.argv[1])
 
  cleanedData = distData.flatMap(json_extractor.extract_json)
 
  cleanedData.saveAsTextFile(sys.argv[3])
 
  JsonExtractor only selects the data from field that is given by
 sys.argv[1].
 
 
 
  My data are basically many small one json files, where is one json per
 line.
 
  I have tried both, reading and writing the data from/to Amazon S3, local
  disc on all the machines.
 
  I would like to ask if there is something that I am missing or if Spark
 is
  supposed to be so slow in comparison with the local non parallelized
 single
  node program.
 
 
 
  Thank you in advance for any suggestions or hints.
 
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle issues in the current master

2014-10-22 Thread Aaron Davidson
You may be running into this issue:
https://issues.apache.org/jira/browse/SPARK-4019

You could check by having 2000 or fewer reduce partitions.

On Wed, Oct 22, 2014 at 1:48 PM, DB Tsai dbt...@dbtsai.com wrote:

 PS, sorry for spamming the mailing list. Based my knowledge, both
 spark.shuffle.spill.compress and spark.shuffle.compress are default to
 true, so in theory, we should not run into this issue if we don't
 change any setting. Is there any other big we run into?

 Thanks.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, Oct 22, 2014 at 1:37 PM, DB Tsai dbt...@dbtsai.com wrote:
  Or can it be solved by setting both of the following setting into true
 for now?
 
  spark.shuffle.spill.compress true
  spark.shuffle.compress ture
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Wed, Oct 22, 2014 at 1:34 PM, DB Tsai dbt...@dbtsai.com wrote:
  It seems that this issue should be addressed by
  https://github.com/apache/spark/pull/2890 ? Am I right?
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Wed, Oct 22, 2014 at 11:54 AM, DB Tsai dbt...@dbtsai.com wrote:
  Hi all,
 
  With SPARK-3948, the exception in Snappy PARSING_ERROR is gone, but
  I've another exception now. I've no clue about what's going on; does
  anyone run into similar issue? Thanks.
 
  This is the configuration I use.
  spark.rdd.compress true
  spark.shuffle.consolidateFiles true
  spark.shuffle.manager SORT
  spark.akka.frameSize 128
  spark.akka.timeout  600
  spark.core.connection.ack.wait.timeout  600
  spark.core.connection.auth.wait.timeout 300
 
 
 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325)
 
  
 java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
 
  java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
  java.io.ObjectInputStream.init(ObjectInputStream.java:299)
 
  
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.init(JavaSerializer.scala:57)
 
  
 org.apache.spark.serializer.JavaDeserializationStream.init(JavaSerializer.scala:57)
 
  
 org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95)
 
  
 org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:351)
 
  
 org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196)
 
  
 org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196)
 
  
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
 
  
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 
  org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
 
  
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 
  org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89)
 
  
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
  org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  org.apache.spark.scheduler.Task.run(Task.scala:56)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
 
  
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
  
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:744)
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: 

Re: Getting spark to use more than 4 cores on Amazon EC2

2014-10-22 Thread Aaron Davidson
Another wild guess, if your data is stored in S3, you might be running into
an issue where the default jets3t properties limits the number of parallel
S3 connections to 4. Consider increasing the max-thread-counts from here:
http://www.jets3t.org/toolkit/configuration.html.

On Tue, Oct 21, 2014 at 10:39 AM, Andy Davidson 
a...@santacruzintegration.com wrote:

 On a related note, how are you submitting your job?

 I have a simple streaming proof of concept and noticed that everything
 runs on my master. I wonder if I do not have enough load for spark to push
 tasks to the slaves.

 Thanks

 Andy

 From: Daniel Mahler dmah...@gmail.com
 Date: Monday, October 20, 2014 at 5:22 PM
 To: Nicholas Chammas nicholas.cham...@gmail.com
 Cc: user user@spark.apache.org
 Subject: Re: Getting spark to use more than 4 cores on Amazon EC2

 I am using globs though

 raw = sc.textFile(/path/to/dir/*/*)

 and I have tons of files so 1 file per partition should not be a problem.

 On Mon, Oct 20, 2014 at 7:14 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 The biggest danger with gzipped files is this:

  raw = sc.textFile(/path/to/file.gz, 8) raw.getNumPartitions()1

 You think you’re telling Spark to parallelize the reads on the input, but
 Spark cannot parallelize reads against gzipped files. So 1 gzipped file
 gets assigned to 1 partition.

 It might be a nice user hint if Spark warned when parallelism is disabled
 by the input format.

 Nick
 ​

 On Mon, Oct 20, 2014 at 6:53 PM, Daniel Mahler dmah...@gmail.com wrote:

 Hi Nicholas,

 Gzipping is a an impressive guess! Yes, they are.
 My data sets are too large to make repartitioning viable, but I could
 try it on a subset.
 I generally have many more partitions than cores.
 This was happenning before I started setting those configs.

 thanks
 Daniel


 On Mon, Oct 20, 2014 at 5:37 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Are you dealing with gzipped files by any chance? Does explicitly
 repartitioning your RDD to match the number of cores in your cluster help
 at all? How about if you don't specify the configs you listed and just go
 with defaults all around?

 On Mon, Oct 20, 2014 at 5:22 PM, Daniel Mahler dmah...@gmail.com
 wrote:

 I launch the cluster using vanilla spark-ec2 scripts.
 I just specify the number of slaves and instance type

 On Mon, Oct 20, 2014 at 4:07 PM, Daniel Mahler dmah...@gmail.com
 wrote:

 I usually run interactively from the spark-shell.
 My data definitely has more than enough partitions to keep all the
 workers busy.
 When I first launch the cluster I first do:

 +
 cat EOF ~/spark/conf/spark-defaults.conf
 spark.serializerorg.apache.spark.serializer.KryoSerializer
 spark.rdd.compress  true
 spark.shuffle.consolidateFiles  true
 spark.akka.frameSize  20
 EOF

 copy-dir /root/spark/conf
 spark/sbin/stop-all.sh
 sleep 5
 spark/sbin/start-all.sh
 +

 before starting the spark-shell or running any jobs.




 On Mon, Oct 20, 2014 at 2:57 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Perhaps your RDD is not partitioned enough to utilize all the cores
 in your system.

 Could you post a simple code snippet and explain what kind of
 parallelism you are seeing for it? And can you report on how many
 partitions your RDDs have?

 On Mon, Oct 20, 2014 at 3:53 PM, Daniel Mahler dmah...@gmail.com
 wrote:


 I am launching EC2 clusters using the spark-ec2 scripts.
 My understanding is that this configures spark to use the available
 resources.
 I can see that spark will use the available memory on larger
 istance types.
 However I have never seen spark running at more than 400% (using
 100% on 4 cores)
 on machines with many more cores.
 Am I misunderstanding the docs? Is it just that high end ec2
 instances get I/O starved when running spark? It would be strange if 
 that
 consistently produced a 400% hard limit though.

 thanks
 Daniel











Re: input split size

2014-10-18 Thread Aaron Davidson
The minPartitions argument of textFile/hadoopFile cannot decrease the
number of splits past the physical number of blocks/files. So if you have 3
HDFS blocks, asking for 2 minPartitions will still give you 3 partitions
(hence the min). It can, however, convert a file with fewer HDFS blocks
into more (so you could ask for and get 4 partitions), assuming the blocks
are splittable. HDFS blocks are usually splittable, but if it's
compressed with something like bzip2, it would not be.

If you wish to combine splits from a larger file, you can use RDD#coalesce.
With shuffle=false, this will simply concatenate partitions, but it does
not provide any ordering guarantees (it uses an algorithm which attempts to
coalesce co-located partitions, to maintain locality information).

coalesce() with shuffle=true causes all of the elements will be shuffled
around randomly into new partitions, which is an expensive operation but
guarantees uniformity of data distribution.

On Sat, Oct 18, 2014 at 10:47 AM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 Does it retain the order if its pulling from the hdfs blocks, meaning
 if  file1 = a, b, c partition in order
 if I convert to 2 partition read will it map to ab, c or a, bc or it can
 also be a, cb ?


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi


 On Sat, Oct 18, 2014 at 9:09 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 Also - if you're doing a text file read you can pass the number of
 resulting partitions as the second argument.
 On Oct 17, 2014 9:05 PM, Larry Liu larryli...@gmail.com wrote:

 Thanks, Andrew. What about reading out of local?

 On Fri, Oct 17, 2014 at 5:38 PM, Andrew Ash and...@andrewash.com
 wrote:

 When reading out of HDFS it's the HDFS block size.

 On Fri, Oct 17, 2014 at 5:27 PM, Larry Liu larryli...@gmail.com
 wrote:

 What is the default input split size? How to change it?







Re: How to change the values in Array of Bytes

2014-09-06 Thread Aaron Davidson
More of a Scala question than Spark, but apply here can be written with
just parentheses like this:

val array = Array.fill[Byte](10)(0)
if (array(index) == 0) {
  array(index) = 1
}

The second instance of array(index) = 1 is actually not calling apply,
but update. It's a scala-ism that's usually invisible. The above is
equivalent to:

if (array.apply(index) == 0) {
  array.update(index, 1)
}



On Sat, Sep 6, 2014 at 2:09 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 I have an array of bytes and I have filled the array with 0 in all the
 postitions.


 *var Array = Array.fill[Byte](10)(0)*
 Now, if certain conditions are satisfied, I want to change some elements
 of the array to 1 instead of 0. If I run,



 *if (Array.apply(index)==0) Array.apply(index) = 1*
 it returns me an error.

 But if I assign *Array.apply(index) *to a variable and do the same thing
 then it works. I do not want to assign this to variables because if I do
 this, I would be creating a lot of variables.

 Can anyone tell me a method to do this?

 Thank You





Re: error: type mismatch while Union

2014-09-06 Thread Aaron Davidson
Are you doing this from the spark-shell? You're probably running into
https://issues.apache.org/jira/browse/SPARK-1199 which should be fixed in
1.1.


On Sat, Sep 6, 2014 at 3:03 AM, Dhimant dhimant84.jays...@gmail.com wrote:

 I am using Spark version 1.0.2




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/error-type-mismatch-while-Union-tp13547p13618.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: question on replicate() in blockManager.scala

2014-09-06 Thread Aaron Davidson
Looks like that's BlockManagerWorker.syncPutBlock(), which is in an if
check, perhaps obscuring its existence.


On Fri, Sep 5, 2014 at 2:19 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Hi,

 var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: String, data: ByteBuffer, level:
 StorageLevel) {
 val tLevel = StorageLevel(level.useDisk, level.useMemory,
 level.deserialized, 1)
 if (cachedPeers == null) {
   cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
 }
 for (peer: BlockManagerId - cachedPeers) {
   val start = System.nanoTime
   data.rewind()
   logDebug(Try to replicate BlockId  + blockId +  once; The size of
 the data is 
 + data.limit() +  Bytes. To node:  + peer)
   if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data,
 tLevel),
 new ConnectionManagerId(peer.host, peer.port))) {
 logError(Failed to call syncPutBlock to  + peer)
   }
   logDebug(Replicated BlockId  + blockId +  once used  +
 (System.nanoTime - start) / 1e6 +  s; The size of the data is  +
 data.limit() +  bytes.)
 }


 I get the flow of this code. But, I dont find any method being called for
 actually writing the data into the set of peers chosen for replication.

 Where exaclty is the replication happening?

 Thank you!!
 -Karthik



Re: Getting the type of an RDD in spark AND pyspark

2014-09-06 Thread Aaron Davidson
Pretty easy to do in Scala:

rdd.elementClassTag.runtimeClass

You can access this method from Python as well by using the internal _jrdd.
It would look something like this (warning, I have not tested it):
rdd._jrdd.classTag().runtimeClass()

(The method name is classTag for JavaRDDLike, and elementClassTag for
Scala's RDD.)


On Thu, Sep 4, 2014 at 1:32 PM, esamanas evan.sama...@gmail.com wrote:

 Hi,

 I'm new to spark and scala, so apologies if this is obvious.

 Every RDD appears to be typed, which I can see by seeing the output in the
 spark-shell when I execute 'take':

 scala val t = sc.parallelize(Array(1,2,3))
 t: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize
 at console:12

 scala t.take(3)
 res4: Array[Int] = Array(1, 2, 3)


 scala val u = sc.parallelize(Array(1,Array(2,2,2,2,2),3))
 u: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[3] at parallelize
 at console:12

 scala u.take(3)
 res5: Array[Any] = Array(1, Array(2, 2, 2, 2, 2), 3)

 Array type stays the same even if only one type returned.
 scala u.take(1)
 res6: Array[Any] = Array(1)


 Is there some way to just get the name of the type of the entire RDD from
 some function call?  Also, I would really like this same functionality in
 pyspark, so I'm wondering if that exists on that side, since clearly the
 underlying RDD is typed (I'd be fine with either the Scala or Python type
 name).

 Thank you,

 Evan




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-type-of-an-RDD-in-spark-AND-pyspark-tp13498.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Stage failure in BlockManager due to FileNotFoundException on long-running streaming job

2014-08-20 Thread Aaron Davidson
This is likely due to a bug in shuffle file consolidation (which you have
enabled) which was hopefully fixed in 1.1 with this patch:
https://github.com/apache/spark/commit/78f2af582286b81e6dc9fa9d455ed2b369d933bd

Until 1.0.3 or 1.1 are released, the simplest solution is to disable
spark.shuffle.consolidateFiles. (You could also try backporting the fixes
yourself if you really need consolidated files.)


On Wed, Aug 20, 2014 at 9:28 AM, Silvio Fiorito 
silvio.fior...@granturing.com wrote:

  This is a long running Spark Streaming job running in YARN, Spark v1.0.2
 on CDH5. The jobs will run for about 34-37 hours then die due to this
 FileNotFoundException. There’s very little CPU or RAM usage, I’m running 2
 x cores, 2 x executors, 4g memory, YARN cluster mode.


  Here’s the stack trace that I pulled from the History server:

  Job aborted due to stage failure: Task 34331.0:1 failed 4 times, most
 recent failure: Exception failure in TID 902905 on host host05:
 java.io.FileNotFoundException:
 /data02/yarn/nm/usercache/sfiorito/appcache/application_1402494159106_0524/spark-local-20140818181035-079a/29/merged_shuffle_9809_1_0
 (No such file or directory) java.io.RandomAccessFile.open(Native Method)
 java.io.RandomAccessFile.init(RandomAccessFile.java:241)
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98)
 org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
 org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:203)
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:203)
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:234)
 org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:537)
 org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:76)
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:133)
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:123)
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:123)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 org.apache.spark.scheduler.Task.run(Task.scala:51)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:744) Driver stacktrace:




Re: s3:// sequence file startup time

2014-08-17 Thread Aaron Davidson
The driver must initially compute the partitions and their preferred
locations for each part of the file, which results in a serial
getFileBlockLocations() on each part. However, I would expect this to take
several seconds, not minutes, to perform on 1000 parts. Is your driver
inside or outside of AWS? There is an order of magnitude difference in the
latency of S3 requests if you're running outside of AWS.

We have also experienced an excessive slowdown in the metadata lookups
using Hadoop 2 versus Hadoop 1, likely due to the differing jets3t library
versions. If you're using Hadoop 2, you might try downgrading to Hadoop
1.2.1 and seeing if the startup time decreases.


On Sat, Aug 16, 2014 at 6:46 PM, kmatzen kmat...@gmail.com wrote:

 I have some RDD's stored as s3://-backed sequence files sharded into 1000
 parts.  The startup time is pretty long (~10's of minutes).  It's
 communicating with S3, but I don't know what it's doing.  Is it just
 fetching the metadata from S3 for each part?  Is there a way to pipeline
 this with the computation?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/s3-sequence-file-startup-time-tp12242.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark: why need a masterLock when sending heartbeat to master

2014-08-17 Thread Aaron Davidson
Yes, good point, I believe the masterLock is now unnecessary altogether.
The reason for its initial existence was that changeMaster() originally
could be called out-of-band of the actor, and so we needed to make sure the
master reference did not change out from under us. Now it appears that all
master mutation and accesses are within the actor (we only call
tryRegisterAllMasters() from a different thread, which does not use the
master field).

If you like, feel free to submit a PR to remove the masterLock.


On Sun, Aug 17, 2014 at 8:58 AM, Victor Sheng victorsheng...@gmail.com
wrote:

 I don't understand why worker need a master lock when sending heartbeat.

 Caused by master HA ? Who can explain this in detail? Thanks~

 Please refer:

 http://stackoverflow.com/questions/25173219/why-does-the-spark-worker-actor-use-a-masterlock

  case SendHeartbeat =
   masterLock.synchronized {
 if (connected) { master ! Heartbeat(workerId) }
   }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-why-need-a-masterLock-when-sending-heartbeat-to-master-tp12256.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Iterator over RDD in PySpark

2014-08-01 Thread Aaron Davidson
rdd.toLocalIterator will do almost what you want, but requires that each
individual partition fits in memory (rather than each individual line).
Hopefully that's sufficient, though.


On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote:

 Is there a way to get iterator from RDD? Something like rdd.collect(), but
 returning lazy sequence and not single array.

 Context: I need to GZip processed data to upload it to Amazon S3. Since
 archive should be a single file, I want to iterate over RDD, writing each
 line to a local .gz file. File is small enough to fit local disk, but still
 large enough not to fit into memory.



Re: Iterator over RDD in PySpark

2014-08-01 Thread Aaron Davidson
Ah, that's unfortunate, that definitely should be added. Using a
pyspark-internal method, you could try something like

javaIterator = rdd._jrdd.toLocalIterator()
it = rdd._collect_iterator_through_file(javaIterator)


On Fri, Aug 1, 2014 at 3:04 PM, Andrei faithlessfri...@gmail.com wrote:

 Thanks, Aaron, it should be fine with partitions (I can repartition it
 anyway, right?).
 But rdd.toLocalIterator is purely Java/Scala method. Is there Python
 interface to it?
 I can get Java iterator though rdd._jrdd, but it isn't converted to Python
 iterator automatically. E.g.:

rdd = sc.parallelize([1, 2, 3, 4, 5])
it = rdd._jrdd.toLocalIterator()
next(it)
   14/08/02 01:02:32 INFO SparkContext: Starting job: apply at
 Iterator.scala:371
   ...
   14/08/02 01:02:32 INFO SparkContext: Job finished: apply at
 Iterator.scala:371, took 0.02064317 s
   bytearray(b'\x80\x02K\x01.')

 I understand that returned byte array somehow corresponds to actual data,
 but how can I get it?



 On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson ilike...@gmail.com wrote:

 rdd.toLocalIterator will do almost what you want, but requires that each
 individual partition fits in memory (rather than each individual line).
 Hopefully that's sufficient, though.


 On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote:

 Is there a way to get iterator from RDD? Something like rdd.collect(),
 but returning lazy sequence and not single array.

 Context: I need to GZip processed data to upload it to Amazon S3. Since
 archive should be a single file, I want to iterate over RDD, writing each
 line to a local .gz file. File is small enough to fit local disk, but still
 large enough not to fit into memory.






Re: Spilling in-memory... messages in log even with MEMORY_ONLY

2014-07-27 Thread Aaron Davidson
I see. There should not be a significant algorithmic difference between
those two cases, as far as I can think, but there is a good bit of
local-mode-only logic in Spark.

One typical problem we see on large-heap, many-core JVMs, though, is much
more time spent in garbage collection. I'm not sure how oprofile gathers
its statistics, but it's possible the stop-the-world pauses just appear as
pausing inside regular methods. You could see if this is happening by
adding -XX:+PrintGCDetails to spark.executor.extraJavaOptions (in
spark-defaults.conf) and --driver-java-options (as a command-line
argument), and then examining the stdout logs.


On Sun, Jul 27, 2014 at 10:29 AM, lokesh.gidra lokesh.gi...@gmail.com
wrote:

 I am comparing the total time spent in finishing the job. And What I am
 comparing, to be precise, is on a 48-core machine. I am comparing the
 performance of local[48] vs. standalone mode with 8 nodes of 6 cores each
 (totalling 48 cores) on localhost. In this comparison, the standalone mode
 outperforms local[48] substantially. When I did some troublshooting using
 oprofile, I found that local[48] was spending much more time in
 writeObject0
 as compared to standalone mode.

 I am running the PageRank example provided in the package.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10743.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Configuring Spark Memory

2014-07-24 Thread Aaron Davidson
Whoops, I was mistaken in my original post last year. By default, there is
one executor per node per Spark Context, as you said.
spark.executor.memory is the amount of memory that the application
requests for each of its executors. SPARK_WORKER_MEMORY is the amount of
memory a Spark Worker is willing to allocate in executors.

So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your cluster,
and spark.executor.memory to 4g, you would be able to run 2 simultaneous
Spark Contexts who get 4g per node. Similarly, if spark.executor.memory
were 8g, you could only run 1 Spark Context at a time on the cluster, but
it would get all the cluster's memory.


On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson mar...@skimlinks.com
wrote:

 Thank you Nishkam,
 I have read your code. So, for the sake of my understanding, it seems that
 for each spark context there is one executor per node? Can anyone confirm
 this?


 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote:

 See if this helps:

 https://github.com/nishkamravi2/SparkAutoConfig/

 It's a very simple tool for auto-configuring default parameters in Spark.
 Takes as input high-level parameters (like number of nodes, cores per node,
 memory per node, etc) and spits out default configuration, user advice and
 command line. Compile (javac SparkConfigure.java) and run (java
 SparkConfigure).

 Also cc'ing dev in case others are interested in helping evolve this over
 time (by refining the heuristics and adding more parameters).


  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 Thanks Andrew,

 So if there is only one SparkContext there is only one executor per
 machine? This seems to contradict Aaron's message from the link above:

 If each machine has 16 GB of RAM and 4 cores, for example, you might
 set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by
 Spark.)

 Am I reading this incorrectly?

 Anyway our configuration is 21 machines (one master and 20 slaves) each
 with 60Gb. We would like to use 4 cores per machine. This is pyspark so we
 want to leave say 16Gb on each machine for python processes.

 Thanks again for the advice!



 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com
 wrote:

 Hi Martin,

 In standalone mode, each SparkContext you initialize gets its own set
 of executors across the cluster.  So for example if you have two shells
 open, they'll each get two JVMs on each worker machine in the cluster.

 As far as the other docs, you can configure the total number of cores
 requested for the SparkContext, the amount of memory for the executor JVM
 on each machine, the amount of memory for the Master/Worker daemons (little
 needed since work is done in executors), and several other settings.

 Which of those are you interested in?  What spec hardware do you have
 and how do you want to configure it?

 Andrew


 On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 We are having difficulties configuring Spark, partly because we still
 don't understand some key concepts. For instance, how many executors are
 there per machine in standalone mode? This is after having closely
 read the documentation several times:

 *http://spark.apache.org/docs/latest/configuration.html
 http://spark.apache.org/docs/latest/configuration.html*
 *http://spark.apache.org/docs/latest/spark-standalone.html
 http://spark.apache.org/docs/latest/spark-standalone.html*
 *http://spark.apache.org/docs/latest/tuning.html
 http://spark.apache.org/docs/latest/tuning.html*
 *http://spark.apache.org/docs/latest/cluster-overview.html
 http://spark.apache.org/docs/latest/cluster-overview.html*

 The cluster overview has some information here about executors but is
 ambiguous about whether there are single executors or multiple executors 
 on
 each machine.

  This message from Aaron Davidson implies that the executor memory
 should be set to total available memory on the machine divided by the
 number of cores:
 *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E
 http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E*

 But other messages imply that the executor memory should be set to the
 *total* available memory of each machine.

 We would very much appreciate some clarity on this and the myriad of
 other memory settings available (daemon memory, worker memory etc). 
 Perhaps
 a worked example could be added to the docs? I would be happy to provide
 some text as soon as someone can enlighten me on the technicalities!

 Thank you

 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]








Re: What if there are large, read-only variables shared by all map functions?

2014-07-23 Thread Aaron Davidson
In particular, take a look at the TorrentBroadcast, which should be much
more efficient than HttpBroadcast (which was the default in 1.0) for large
files.

If you find that TorrentBroadcast doesn't work for you, then another way to
solve this problem is to place the data on all nodes' local disks, and
amortize the cost of the data loading by using RDD#mapPartitions instead of
#map, which allows you to do the loading once for a large set of elements.

You could refine this model further by keeping some sort of (perhaps
static) state on your Executors, like

object LookupTable {
  def getOrLoadTable(): LookupTable
}

and then using this method in your map partitions method. This would ensure
the table is only loaded once on each Executor, and could also be used to
ensure the data remains between jobs. You should be careful, though, at
using so much memory outside of Spark's knowledge -- you may need to tune
the Spark memory options if you run into OutOfMemoryErrors.


On Wed, Jul 23, 2014 at 8:39 PM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 Have a look at broadcast variables .


 On Tuesday, July 22, 2014, Parthus peng.wei@gmail.com wrote:

 Hi there,

 I was wondering if anybody could help me find an efficient way to make a
 MapReduce program like this:

 1) For each map function, it need access some huge files, which is around
 6GB

 2) These files are READ-ONLY. Actually they are like some huge look-up
 table, which will not change during 2~3 years.

 I tried two ways to make the program work, but neither of them is
 efficient:

 1) The first approach I tried is to let each map function load those files
 independently, like this:

 map (...) { load(files); DoMapTask(...)}

 2) The second approach I tried is to load the files before RDD.map(...)
 and
 broadcast the files. However, because the files are too large, the
 broadcasting overhead is 30min ~ 1 hour.

 Could anybody help me find an efficient way to solve it?

 Thanks very much.










 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/What-if-there-are-large-read-only-variables-shared-by-all-map-functions-tp10435.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



 --
 Sent from Gmail Mobile



Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-21 Thread Aaron Davidson
What's the exception you're seeing? Is it an OOM?


On Mon, Jul 21, 2014 at 11:20 AM, chutium teng@gmail.com wrote:

 Hi,

 unfortunately it is not so straightforward

 xxx_parquet.db

 is a folder of managed database created by hive/impala, so, every sub
 element in it is a table in hive/impala, they are folders in HDFS, and each
 table has different schema, and in its folder there are one or more parquet
 files.

 that means

 xx001_suffix
 xx002_suffix

 are folders, there are some parquet files like

 xx001_suffix/parquet_file1_with_schema1

 xx002_suffix/parquet_file1_with_schema2
 xx002_suffix/parquet_file2_with_schema2

 it seems only union can do this job~

 Nonetheless, thank you very much, maybe the only reason is that spark
 eating
 up too much memory...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10335.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: which kind of BlockId should I use?

2014-07-20 Thread Aaron Davidson
Hm, this is not a public API, but you should theoretically be able to use
TestBlockId if you like. Internally, we just use the BlockId's natural
hashing and equality to do lookups and puts, so it should work fine.
However, since it is in no way public API, it may change even in
maintenance releases.


On Sun, Jul 20, 2014 at 10:25 PM, william k...@qq.com wrote:

 When spark is 0.7.3, I use SparkEnv.get.blockManager.getLocal(model) and
 SparkEnv.get.blockManager.put(model, buf, StorageLevel.MEMORY_ONLY,
 false) to cached model object

 When I porting to spark 1.0.1, I found  SparkEnv.get.blockManager.getLocal
  ‍SparkEnv.get.blockManager.put's APIs changed to use BlockId instead of
 String.

 And BlockId has 5 concrete class, which one should I use? None of them
 have a string input parameter.‍‍

 Thx



Re: Large Task Size?

2014-07-15 Thread Aaron Davidson
Ah, I didn't realize this was non-MLLib code. Do you mean to be
sending stochasticLossHistory
in the closure as well?


On Sun, Jul 13, 2014 at 1:05 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:

 It uses the standard SquaredL2Updater, and I also tried to broadcast it as
 well.

 The input is a RDD created by taking the union of several inputs, that
 have all been run against MLUtils.kFold to produce even more RDDs. If I run
 with 10 different inputs, each with 10 kFolds. I'm pretty certain that all
 of the input RDDs have clean closures. But I'm curious, is there a high
 overhead for running union? Could that create larger task sizes?

 Kyle



 On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 I also did a quick glance through the code and couldn't find anything
 worrying that should be included in the task closures. The only possibly
 unsanitary part is the Updater you pass in -- what is your Updater and is
 it possible it's dragging in a significant amount of extra state?


 On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu
 wrote:

 I'm working of a patch to MLLib that allows for multiplexing several
 different model optimization using the same RDD ( SPARK-2372:
 https://issues.apache.org/jira/browse/SPARK-2372 )

 In testing larger datasets, I've started to see some memory errors (
 java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize
 errors ).
 My main clue is that Spark will start logging warning on smaller systems
 like:

 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a
 task of very large size (10119 KB). The maximum recommended task size is
 100 KB.

 Looking up start '2862' in the case leads to a 'sample at
 GroupedGradientDescent.scala:156' call. That code can be seen at

 https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156

 I've looked over the code, I'm broadcasting the larger variables, and
 between the sampler and the combineByKey, I wouldn't think there much data
 being moved over the network, much less a 10MB chunk.

 Any ideas of what this might be a symptom of?

 Kyle






Re: Confused by groupByKey() and the default partitioner

2014-07-13 Thread Aaron Davidson
Ah -- I should have been more clear, list concatenation isn't going to be
any faster. In many cases I've seen people use groupByKey() when they are
really trying to do some sort of aggregation. and thus constructing this
concatenated list is more expensive than they need.


On Sun, Jul 13, 2014 at 9:13 AM, Guanhua Yan gh...@lanl.gov wrote:

 Thanks, Aaron. I replaced groupByKey with reduceByKey along with some list
 concatenation operations, and found that the performance becomes even
 worse. So groupByKey is not that bad in my code.

 Best regards,
 - Guanhua



 From: Aaron Davidson ilike...@gmail.com
 Reply-To: user@spark.apache.org
 Date: Sat, 12 Jul 2014 16:32:22 -0700
 To: user@spark.apache.org
 Subject: Re: Confused by groupByKey() and the default partitioner

 Yes, groupByKey() does partition by the hash of the key unless you specify
 a custom Partitioner.

 (1) If you were to use groupByKey() when the data was already partitioned
 correctly, the data would indeed not be shuffled. Here is the associated
 code, you'll see that it simply checks that the Partitioner the groupBy()
 is looking for is equal to the Partitioner of the pre-existing RDD:
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L89

 By the way, I should warn you that groupByKey() is not a recommended
 operation if you can avoid it, as it has non-obvious performance issues
 when running with serious data.


 On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote:

 Hi:

 I have trouble understanding the default partitioner (hash) in Spark.
 Suppose that an RDD with two partitions is created as follows:

 x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2)

 Does spark partition x based on the hash of the key (e.g., a, b, c) by 
 default?

 (1) Assuming this is correct, if I further use the groupByKey primitive, 
 x.groupByKey(), all the records sharing the same key should be located in 
 the same partition. Then it's not necessary to shuffle the data records 
 around, as all the grouping operations can be done locally.

 (2) If it's not true, how could I specify a partitioner simply based on the 
 hashing of the key (in Python)?

 Thank you,

 - Guanhua





Re: Confused by groupByKey() and the default partitioner

2014-07-12 Thread Aaron Davidson
Yes, groupByKey() does partition by the hash of the key unless you specify
a custom Partitioner.

(1) If you were to use groupByKey() when the data was already partitioned
correctly, the data would indeed not be shuffled. Here is the associated
code, you'll see that it simply checks that the Partitioner the groupBy()
is looking for is equal to the Partitioner of the pre-existing RDD:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L89

By the way, I should warn you that groupByKey() is not a recommended
operation if you can avoid it, as it has non-obvious performance issues
when running with serious data.


On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote:

 Hi:

 I have trouble understanding the default partitioner (hash) in Spark.
 Suppose that an RDD with two partitions is created as follows:

 x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2)

 Does spark partition x based on the hash of the key (e.g., a, b, c) by 
 default?

 (1) Assuming this is correct, if I further use the groupByKey primitive, 
 x.groupByKey(), all the records sharing the same key should be located in the 
 same partition. Then it's not necessary to shuffle the data records around, 
 as all the grouping operations can be done locally.

 (2) If it's not true, how could I specify a partitioner simply based on the 
 hashing of the key (in Python)?

 Thank you,

 - Guanhua




Re: KMeans for large training data

2014-07-12 Thread Aaron Davidson
The netlib.BLAS: Failed to load implementation warning only means that
the BLAS implementation may be slower than using a native one. The reason
why it only shows up at the end is that the library is only used for the
finalization step of the KMeans algorithm, so your job should've been
wrapping up at this point. I am not familiar with the algorithm beyond
that, so I'm not sure if for some reason we're trying to collect too much
data back to the driver here.

SPARK_DRIVER_MEMORY can increase the driver memory, by the way (or by using
the --driver-memory option when using spark-submit).


On Sat, Jul 12, 2014 at 2:38 AM, durin m...@simon-schaefer.net wrote:

 Your latest response doesn't show up here yet, I only got the mail. I'll
 still answer here in the hope that it appears later:

 Which memory setting do you mean? I can go up with spark.executor.memory a
 bit, it's currently set to 12G. But thats already way more than the whole
 SchemaRDD of Vectors that I currently use for training, which shouldn't be
 more than a few hundred M.
 I suppose you rather mean something comparable to SHARK_MASTER_MEM in
 Shark.
 I can't find the equivalent for Spark in the documentations, though.

 And if it helps, I can summarize the whole code currently that I currently
 use. It's nothing really fancy at the moment, I'm just trying to classify
 Strings that each contain a few words (words are handled each as atomic
 items).



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407p9509.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Convert from RDD[Object] to RDD[Array[Object]]

2014-07-12 Thread Aaron Davidson
If you don't really care about the batchedDegree, but rather just want to
do operations over some set of elements rather than one at a time, then
just use mapPartitions().

Otherwise, if you really do want certain sized batches and you are able to
relax the constraints slightly, is to construct these batches within each
partition. For instance:

val batchedRDD = rdd.mapPartitions { iter: Iterator[Int] =
  new Iterator[Array[Int]] {
def hasNext: Boolean = iter.hasNext
def next(): Array[Int] = {
  iter.take(batchedDegree).toArray
}
  }
}

This function is efficient in that it does not load the entire partition
into memory, just enough to construct each batch. However, there will be
one smaller batch at the end of each partition (rather than just one over
the entire dataset).



On Sat, Jul 12, 2014 at 6:03 PM, Parthus peng.wei@gmail.com wrote:

 Hi there,

 I have a bunch of data in a RDD, which I processed it one by one
 previously.
 For example, there was a RDD denoted by data: RDD[Object] and then I
 processed it using data.map(...).  However, I got a new requirement to
 process the data in a patched way. It means that I need to convert the RDD
 from RDD[Object] to RDD[Array[Object]] and then process it, which is to
 fill
 out this function: def convert2array(inputs: RDD[Object], batchedDegree:
 Int): RDD[Array[Object]] = {...}.

 I hope that after the conversion, each element of the new RDD is an array
 of
 the previous RDD elements. The parameter batchedDegree specifies how many
 elements are batched together. For example, if I have 210 elements in the
 previous RDD, the result of the conversion functions should be a RDD with 3
 elements. Each element is an array, and the first two arrays contains 1~100
 and 101~200 elements. The third element contains 201~210 elements.

 I was wondering if anybody could help me complete this scala function with
 an efficient way. Thanks a lot.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Convert-from-RDD-Object-to-RDD-Array-Object-tp9530.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: pyspark sc.parallelize running OOM with smallish data

2014-07-12 Thread Aaron Davidson
I think this is probably dying on the driver itself, as you are probably
materializing the whole dataset inside your python driver. How large is
spark_data_array compared to your driver memory?


On Fri, Jul 11, 2014 at 7:30 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 I put the same dataset into scala (using spark-shell) and it acts weird. I
 cannot do a count on it, the executors seem to hang. The WebUI shows 0/96
 in the status bar, shows details about the worker nodes but there is no
 progress.
 sc.parallelize does finish (takes too long for the data size) in scala.


 On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 spark_data_array here has about 35k rows with 4k columns. I have 4 nodes
 in the cluster and gave 48g to executors. also tried kyro serialization.

 traceback (most recent call last):

   File /mohit/./m.py, line 58, in module

 spark_data = sc.parallelize(spark_data_array)

   File /mohit/spark/python/pyspark/context.py, line 265, in parallelize

 jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)

   File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 537, in __call__

   File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.

 : java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)

 at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

 at py4j.Gateway.invoke(Gateway.java:259)

 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

 at py4j.commands.CallCommand.execute(CallCommand.java:79)

 at py4j.GatewayConnection.run(GatewayConnection.java:207)

 at java.lang.Thread.run(Thread.java:745)





Re: Putting block rdd failed when running example svm on large data

2014-07-12 Thread Aaron Davidson
Also check the web ui for that. Each iteration will have one or more stages
associated with it in the driver web ui.


On Sat, Jul 12, 2014 at 6:47 PM, crater cq...@ucmerced.edu wrote:

 Hi Xiangrui,

 Thanks for the information. Also, it is possible to figure out the
 execution
 time per iteration for SVM?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Putting-block-rdd-failed-when-running-example-svm-on-large-data-tp9515p9535.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Large Task Size?

2014-07-12 Thread Aaron Davidson
I also did a quick glance through the code and couldn't find anything
worrying that should be included in the task closures. The only possibly
unsanitary part is the Updater you pass in -- what is your Updater and is
it possible it's dragging in a significant amount of extra state?


On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:

 I'm working of a patch to MLLib that allows for multiplexing several
 different model optimization using the same RDD ( SPARK-2372:
 https://issues.apache.org/jira/browse/SPARK-2372 )

 In testing larger datasets, I've started to see some memory errors (
 java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize
 errors ).
 My main clue is that Spark will start logging warning on smaller systems
 like:

 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a
 task of very large size (10119 KB). The maximum recommended task size is
 100 KB.

 Looking up start '2862' in the case leads to a 'sample at
 GroupedGradientDescent.scala:156' call. That code can be seen at

 https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156

 I've looked over the code, I'm broadcasting the larger variables, and
 between the sampler and the combineByKey, I wouldn't think there much data
 being moved over the network, much less a 10MB chunk.

 Any ideas of what this might be a symptom of?

 Kyle




Re: java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)

2014-07-08 Thread Aaron Davidson
There is a difference from actual GC overhead, which can be reduced by
reusing objects, versus this error, which actually means you ran out of
memory. This error can probably be relieved by increasing your executor
heap size, unless your data is corrupt and it is allocating huge arrays, or
you are otherwise keeping too much memory around.

For your other question, you can reuse objects similar to MapReduce
(HadoopRDD does this by actually using Hadoop's Writables, for instance),
but the general Spark APIs don't support this because mutable objects are
not friendly to caching or serializing.


On Tue, Jul 8, 2014 at 9:27 AM, Konstantin Kudryavtsev 
kudryavtsev.konstan...@gmail.com wrote:

 Hi all,

 I faced with the next exception during map step:
 java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit
 exceeded)
 java.lang.reflect.Array.newInstance(Array.java:70)
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:325)
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
 com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115)
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:155)
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154)
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 I'm using Spark 1.0In map I create new object each time, as I understand
 I can't reuse object similar to MapReduce development? I wondered, if you
 could point me how is it possible to avoid GC overhead...thank you in
 advance

 Thank you,
 Konstantin Kudryavtsev



Re: java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)

2014-07-08 Thread Aaron Davidson
This seems almost equivalent to a heap size error -- since GCs are
stop-the-world events, the fact that we were unable to release more than 2%
of the heap suggests that almost all the memory is *currently in use *(i.e.,
live).

Decreasing the number of cores is another solution which decreases memory
pressure, because each core requires its own set of buffers (for instance,
each kryo serializer has a certain buffer allocated to it), and has its own
working set of data (some subset of a partition). Thus, decreasing the
number of used cores decreases memory contention.


On Tue, Jul 8, 2014 at 10:44 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi Konstantin,

 I just ran into the same problem. I mitigated the issue by reducing the
 number of cores when I executed the job which otherwise it won't be able to
 finish.

 Unlike many people believes, it might not means that you were running out
 of memory. A better answer can be found here:
 http://stackoverflow.com/questions/4371505/gc-overhead-limit-exceeded and
 copied here as a reference:

 Excessive GC Time and OutOfMemoryError

 The concurrent collector will throw an OutOfMemoryError if too much time
 is being spent in garbage collection: if more than 98% of the total time is
 spent in garbage collection and less than 2% of the heap is recovered, an
 OutOfMemoryError will be thrown. This feature is designed to prevent
 applications from running for an extended period of time while making
 little or no progress because the heap is too small. If necessary, this
 feature can be disabled by adding the option -XX:-UseGCOverheadLimit to the
 command line.

 The policy is the same as that in the parallel collector, except that time
 spent performing concurrent collections is not counted toward the 98% time
 limit. In other words, only collections performed while the application is
 stopped count toward excessive GC time. Such collections are typically due
 to a concurrent mode failure or an explicit collection request (e.g., a
 call to System.gc()).

 It could be that there are many tasks running in the same node and they
 all compete for running GCs which slow things down and trigger the error
 you saw. By reducing the number of cores, there are more cpu resources
 available to a task so the GC could finish before the error gets throw.

 HTH,

 Jerry


 On Tue, Jul 8, 2014 at 1:35 PM, Aaron Davidson ilike...@gmail.com wrote:

 There is a difference from actual GC overhead, which can be reduced by
 reusing objects, versus this error, which actually means you ran out of
 memory. This error can probably be relieved by increasing your executor
 heap size, unless your data is corrupt and it is allocating huge arrays, or
 you are otherwise keeping too much memory around.

 For your other question, you can reuse objects similar to MapReduce
 (HadoopRDD does this by actually using Hadoop's Writables, for instance),
 but the general Spark APIs don't support this because mutable objects are
 not friendly to caching or serializing.


 On Tue, Jul 8, 2014 at 9:27 AM, Konstantin Kudryavtsev 
 kudryavtsev.konstan...@gmail.com wrote:

 Hi all,

 I faced with the next exception during map step:
 java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead
 limit exceeded)
 java.lang.reflect.Array.newInstance(Array.java:70)
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:325)
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
 com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34

Re: CoarseGrainedExecutorBackend: Driver Disassociated

2014-07-08 Thread Aaron Davidson
Hmm, looks like the Executor is trying to connect to the driver on
localhost, from this line:
14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver:
akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler

What is your setup? Standalone mode with 4 separate machines? Are you
configuring the driver public dns name somewhere?


On Tue, Jul 8, 2014 at 11:52 AM, Sameer Tilak ssti...@live.com wrote:

 Dear All,

 When I look inside the following directory on my worker node:
 $SPARK_HOME/work/app-20140708110707-0001/3

 I see the following error message:

 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.conf.Configuration).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 14/07/08 11:07:11 INFO SparkHadoopUtil: Using Spark's default log4j
 profile: org/apache/spark/log4j-defaults.properties
 14/07/08 11:07:11 INFO SecurityManager: Changing view acls to: p529444
 14/07/08 11:07:11 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(p529444)
 14/07/08 11:07:12 INFO Slf4jLogger: Slf4jLogger started
 14/07/08 11:07:12 INFO Remoting: Starting remoting
 14/07/08 11:07:13 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679]
 14/07/08 11:07:13 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sparkexecu...@pzxnvm2022.x.y.name.org:34679]
 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver:
 akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler
 14/07/08 11:07:13 INFO WorkerWatcher: Connecting to worker akka.tcp://
 sparkwor...@pzxnvm2022.x.y.name.org:37054/user/Worker
 14/07/08 11:07:13 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
 [akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679] - [akka


 I am not sure what the problem is but it is preventing me to get the 4
 node test cluster up and running.




Re: Comparative study

2014-07-08 Thread Aaron Davidson

 Not sure exactly what is happening but perhaps there are ways to
 restructure your program for it to work better. Spark is definitely able to
 handle much, much larger workloads.


+1 @Reynold

Spark can handle big big data. There are known issues with informing the
user about what went wrong and how to fix it that we're actively working
on, but the first impulse when a job fails should be what did I do wrong
rather than Spark can't handle this workload. Messaging is a huge part in
making this clear -- getting things like a job hanging or an out of memory
error can be very difficult to debug, and improving this is one of our
highest priorties.


On Tue, Jul 8, 2014 at 12:47 PM, Reynold Xin r...@databricks.com wrote:

 Not sure exactly what is happening but perhaps there are ways to
 restructure your program for it to work better. Spark is definitely able to
 handle much, much larger workloads.

 I've personally run a workload that shuffled 300 TB of data. I've also ran
 something that shuffled 5TB/node and stuffed my disks fairly full that the
 file system is close to breaking.

 We can definitely do a better job in Spark to make it output more
 meaningful diagnosis and more robust with partitions of data that don't fit
 in memory though. A lot of the work in the next few releases will be on
 that.



 On Tue, Jul 8, 2014 at 10:04 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 I'll respond for Dan.

 Our test dataset was a total of 10 GB of input data (full production
 dataset for this particular dataflow would be 60 GB roughly).

 I'm not sure what the size of the final output data was but I think it
 was on the order of 20 GBs for the given 10 GB of input data. Also, I can
 say that when we were experimenting with persist(DISK_ONLY), the size of
 all RDDs on disk was around 200 GB, which gives a sense of overall
 transient memory usage with no persistence.

 In terms of our test cluster, we had 15 nodes. Each node had 24 cores and
 2 workers each. Each executor got 14 GB of memory.

 -Suren



 On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  When you say large data sets, how large?
 Thanks


 On 07/07/2014 01:39 PM, Daniel Siegmann wrote:

  From a development perspective, I vastly prefer Spark to MapReduce.
 The MapReduce API is very constrained; Spark's API feels much more natural
 to me. Testing and local development is also very easy - creating a local
 Spark context is trivial and it reads local files. For your unit tests you
 can just have them create a local context and execute your flow with some
 test data. Even better, you can do ad-hoc work in the Spark shell and if
 you want that in your production code it will look exactly the same.

  Unfortunately, the picture isn't so rosy when it gets to production.
 In my experience, Spark simply doesn't scale to the volumes that MapReduce
 will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN
 would be better, but I haven't had the opportunity to try them. I find jobs
 tend to just hang forever for no apparent reason on large data sets (but
 smaller than what I push through MapReduce).

  I am hopeful the situation will improve - Spark is developing quickly
 - but if you have large amounts of data you should proceed with caution.

  Keep in mind there are some frameworks for Hadoop which can hide the
 ugly MapReduce with something very similar in form to Spark's API; e.g.
 Apache Crunch. So you might consider those as well.

  (Note: the above is with Spark 1.0.0.)



 On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com
 wrote:

  Hello Experts,



 I am doing some comparative study on the below:



 Spark vs Impala

 Spark vs MapREduce . Is it worth migrating from existing MR
 implementation to Spark?





 Please share your thoughts and expertise.





 Thanks,
 Santosh

 --

 This message is for the designated recipient only and may contain
 privileged, proprietary, or otherwise confidential information. If you have
 received it in error, please notify the sender immediately and delete the
 original. Any other use of the e-mail by you is prohibited. Where allowed
 by local law, electronic communications with Accenture and its affiliates,
 including e-mail and instant messaging (including content), may be scanned
 by our systems for the purposes of information security and assessment of
 internal compliance with Accenture policy.

 __

 www.accenture.com




 --
  Daniel Siegmann, Software Developer
 Velos
  Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: 

Re: CoarseGrainedExecutorBackend: Driver Disassociated

2014-07-08 Thread Aaron Davidson
You actually should avoid setting SPARK_PUBLIC_DNS unless necessary, I
thought you might have preemptively done so. I think the issue is actually
related to your network configuration, as Spark probably failed to find
your driver's ip address. Do you see a warning on the driver that looks
something like Your hostname, localhost resolves to a loopback address,
but we couldn't find any external IP address?

Either way, let's try to set SPARK_LOCAL_IP (see
http://spark.apache.org/docs/latest/configuration.html) inside
~/spark/conf/spark-env.sh on your driver machine to an IP address that's
reachable by your executors. Something like

export SPARK_LOCAL_IP=194.168.1.105

You can make sure it was set correctly by running
sc.getConf.get(spark.driver.host), which should return the driver
hostname, and NOT localhost.

(Note that it's also possible that your /etc/hosts file contains a mapping
from the driver's ip address to localhost, which it should not.)


On Tue, Jul 8, 2014 at 2:23 PM, Sameer Tilak ssti...@live.com wrote:

 Hi Aaron,
 Would really appreciate your help if you can point me to the
 documentation. Is this something that I need to do with /etc/hosts on each
 of the worker machines ? Or do I set  SPARK_PUBLIC_DNS (if yes, what is the
 format?) or something else?

 I have the following set up:

 master node: pzxnvm2018.x.y.org
 worker nodes:  pzxnvm2022.x.y.org pzxnvm2023.x.y.org pzxnvm2024.x.y.org


 From: ilike...@gmail.com
 Date: Tue, 8 Jul 2014 11:59:54 -0700

 Subject: Re: CoarseGrainedExecutorBackend: Driver Disassociated
 To: user@spark.apache.org


 Hmm, looks like the Executor is trying to connect to the driver on
 localhost, from this line:
 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver:
 akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler

 What is your setup? Standalone mode with 4 separate machines? Are you
 configuring the driver public dns name somewhere?


 On Tue, Jul 8, 2014 at 11:52 AM, Sameer Tilak ssti...@live.com wrote:

 Dear All,

 When I look inside the following directory on my worker node:
 $SPARK_HOME/work/app-20140708110707-0001/3

 I see the following error message:

 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.conf.Configuration).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 14/07/08 11:07:11 INFO SparkHadoopUtil: Using Spark's default log4j
 profile: org/apache/spark/log4j-defaults.properties
 14/07/08 11:07:11 INFO SecurityManager: Changing view acls to: p529444
 14/07/08 11:07:11 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(p529444)
 14/07/08 11:07:12 INFO Slf4jLogger: Slf4jLogger started
 14/07/08 11:07:12 INFO Remoting: Starting remoting
 14/07/08 11:07:13 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679
 http://pzxnvm2022.dcld.pldc.kp.org:34679]
 14/07/08 11:07:13 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sparkexecu...@pzxnvm2022.x.y.name.org:34679
 http://pzxnvm2022.x.y.name.org:34679]
 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver:
 akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler
 14/07/08 11:07:13 INFO WorkerWatcher: Connecting to worker akka.tcp://
 sparkwor...@pzxnvm2022.x.y.name.org:37054/user/Worker
 http://pzxnvm2022.x.y.name.org:37054/user/Worker
 14/07/08 11:07:13 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
 [akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679
 http://pzxnvm2022.dcld.pldc.kp.org:34679] - [akka


 I am not sure what the problem is but it is preventing me to get the 4
 node test cluster up and running.





Re: CoarseGrainedExecutorBackend: Driver Disassociated

2014-07-08 Thread Aaron Davidson
By the way, you can run the sc.getConf.get(spark.driver.host) thing
inside spark-shell, whether or not the Executors actually start up
successfully.


On Tue, Jul 8, 2014 at 8:23 PM, Aaron Davidson ilike...@gmail.com wrote:

 You actually should avoid setting SPARK_PUBLIC_DNS unless necessary, I
 thought you might have preemptively done so. I think the issue is actually
 related to your network configuration, as Spark probably failed to find
 your driver's ip address. Do you see a warning on the driver that looks
 something like Your hostname, localhost resolves to a loopback address,
 but we couldn't find any external IP address?

 Either way, let's try to set SPARK_LOCAL_IP (see
 http://spark.apache.org/docs/latest/configuration.html) inside
 ~/spark/conf/spark-env.sh on your driver machine to an IP address that's
 reachable by your executors. Something like

 export SPARK_LOCAL_IP=194.168.1.105

 You can make sure it was set correctly by running
 sc.getConf.get(spark.driver.host), which should return the driver
 hostname, and NOT localhost.

 (Note that it's also possible that your /etc/hosts file contains a mapping
 from the driver's ip address to localhost, which it should not.)


 On Tue, Jul 8, 2014 at 2:23 PM, Sameer Tilak ssti...@live.com wrote:

 Hi Aaron,
 Would really appreciate your help if you can point me to the
 documentation. Is this something that I need to do with /etc/hosts on each
 of the worker machines ? Or do I set  SPARK_PUBLIC_DNS (if yes, what is the
 format?) or something else?

 I have the following set up:

 master node: pzxnvm2018.x.y.org
 worker nodes:  pzxnvm2022.x.y.org pzxnvm2023.x.y.org pzxnvm2024.x.y.org


 From: ilike...@gmail.com
 Date: Tue, 8 Jul 2014 11:59:54 -0700

 Subject: Re: CoarseGrainedExecutorBackend: Driver Disassociated
 To: user@spark.apache.org


 Hmm, looks like the Executor is trying to connect to the driver on
 localhost, from this line:
 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to
 driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler

 What is your setup? Standalone mode with 4 separate machines? Are you
 configuring the driver public dns name somewhere?


 On Tue, Jul 8, 2014 at 11:52 AM, Sameer Tilak ssti...@live.com wrote:

 Dear All,

 When I look inside the following directory on my worker node:
 $SPARK_HOME/work/app-20140708110707-0001/3

 I see the following error message:

 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.conf.Configuration).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 14/07/08 11:07:11 INFO SparkHadoopUtil: Using Spark's default log4j
 profile: org/apache/spark/log4j-defaults.properties
 14/07/08 11:07:11 INFO SecurityManager: Changing view acls to: p529444
 14/07/08 11:07:11 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(p529444)
 14/07/08 11:07:12 INFO Slf4jLogger: Slf4jLogger started
 14/07/08 11:07:12 INFO Remoting: Starting remoting
 14/07/08 11:07:13 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679
 http://pzxnvm2022.dcld.pldc.kp.org:34679]
 14/07/08 11:07:13 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sparkexecu...@pzxnvm2022.x.y.name.org:34679
 http://pzxnvm2022.x.y.name.org:34679]
 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to
 driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler
 14/07/08 11:07:13 INFO WorkerWatcher: Connecting to worker akka.tcp://
 sparkwor...@pzxnvm2022.x.y.name.org:37054/user/Worker
 http://pzxnvm2022.x.y.name.org:37054/user/Worker
 14/07/08 11:07:13 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679
 http://pzxnvm2022.dcld.pldc.kp.org:34679] - [akka


 I am not sure what the problem is but it is preventing me to get the 4
 node test cluster up and running.






Re: Serialization of objects

2014-07-01 Thread Aaron Davidson
If you want to stick with Java serialization and need to serialize a
non-Serializable object, your best choices are probably to either subclass
it with a Serializable one or wrap it in a class of your own which
implements its own writeObject/readObject methods (see here:
http://stackoverflow.com/questions/6163872/how-to-serialize-a-non-serializable-in-java
)

Otherwise you can use Kryo to register custom serializers for other
people's objects.


On Mon, Jun 30, 2014 at 1:52 PM, Sameer Tilak ssti...@live.com wrote:

 Hi everyone,
 I was able to solve this issue. For now I changed the library code and
 added the following to the class com.wcohen.ss.BasicStringWrapper:

 public class BasicStringWrapper implements  Serializable

 However, I am still curious to know ho to get around the issue when you
 don't have access to the code and you are using a 3rd party jar.


 --
 From: ssti...@live.com
 To: u...@spark.incubator.apache.org
 Subject: Serialization of objects
 Date: Thu, 26 Jun 2014 09:30:31 -0700


 Hi everyone,

 Aaron, thanks for your help so far. I am trying to serialize objects that
 I instantiate from a 3rd party library namely instances of 
 com.wcohen.ss.Jaccard,
 and com.wcohen.ss.BasicStringWrapper. However, I am having problems with
 serialization. I am (at least trying to) using Kryo for serialization. I
  am still facing the serialization issue. I get 
 org.apache.spark.SparkException:
 Job aborted due to stage failure: Task not serializable:
 java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any
 help with this will be great.
 Scala code:

 package approxstrmatch

 import com.wcohen.ss.BasicStringWrapper;
 import com.wcohen.ss.Jaccard;

 import java.util.Iterator;

 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
 import org.apache.spark.SparkConf

 import org.apache.spark.rdd;
 import org.apache.spark.rdd.RDD;

 import com.esotericsoftware.kryo.Kryo
 import org.apache.spark.serializer.KryoRegistrator

 class MyRegistrator extends KryoRegistrator {
   override def registerClasses(kryo: Kryo) {
 kryo.register(classOf[approxstrmatch.JaccardScore])
 kryo.register(classOf[com.wcohen.ss.BasicStringWrapper])
 kryo.register(classOf[com.wcohen.ss.Jaccard])

   }
 }

  class JaccardScore  {

   val mjc = new Jaccard()  with Serializable
   val conf = new
 SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch)
   conf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator)

   val sc = new SparkContext(conf)

   def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String])
  {
   val jc_ = this.mjc

   var i: Int = 0
   for (sentence - sourcerdd.toLocalIterator)
{val str1 = new BasicStringWrapper (sentence)
 var scorevector = destrdd.map(x = jc_.score(str1, new
 BasicStringWrapper(x)))
 val fileName = new
 String(/apps/software/scala-approsstrmatch-sentence + i)
 scorevector.saveAsTextFile(fileName)
 i += 1
}

   }

 Here is the script:
  val distFile = sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt);
  val srcFile = sc.textFile(hdfs://serverip:54310/data/dummy/test.txt);
  val score = new approxstrmatch.JaccardScore()
  score.calculateScoreSecond(srcFile, distFile)

 O/P:

 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
 textFile at console:12), which has no missing parents
 14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage
 0 (MappedRDD[3] at textFile at console:12)
 14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
 14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
 executor localhost: localhost (PROCESS_LOCAL)
 14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes
 in 4 ms
 14/06/25 12:32:05 INFO Executor: Running task ID 0
 14/06/25 12:32:05 INFO Executor: Fetching
 http://serverip:47417/jars/approxstrmatch.jar with timestamp 1403724701564
 14/06/25 12:32:05 INFO Utils: Fetching
 http://serverip:47417/jars/approxstrmatch.jar to
 /tmp/fetchFileTemp8194323811657370518.tmp
 14/06/25 12:32:05 INFO Executor: Adding
 file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to
 class loader
 14/06/25 12:32:05 INFO Executor: Fetching
 http://serverip:47417/jars/secondstring-20140618.jar with timestamp
 1403724701562
 14/06/25 12:32:05 INFO Utils: Fetching
 http://serverip:47417/jars/secondstring-20140618.jar to
 /tmp/fetchFileTemp8711755318201511766.tmp
 14/06/25 12:32:06 INFO Executor: Adding
 file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar
 to class loader
 14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally
 14/06/25 12:32:06 INFO HadoopRDD: Input split:
 hdfs://serverip:54310/data/dummy/test.txt:0+140
 14/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 717
 

Re: Failed to launch Worker

2014-07-01 Thread Aaron Davidson
Where are you running the spark-class version? Hopefully also on the
workers.

If you're trying to centrally start/stop all workers, you can add a
slaves file to the spark conf/ directory which is just a list of your
hosts, one per line. Then you can just use ./sbin/start-slaves.sh to
start the worker on all of your machines.

Note that this is already setup correctly if you're using the spark-ec2
scripts.


On Tue, Jul 1, 2014 at 5:53 AM, MEETHU MATHEW meethu2...@yahoo.co.in
wrote:

 Yes.

 Thanks  Regards,
 Meethu M


   On Tuesday, 1 July 2014 6:14 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


  Is this command working??

 java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/
 assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m
 -Xmx512m org.apache.spark.deploy.worker.Worker spark://x.x.x.174:7077

 Thanks
 Best Regards


 On Tue, Jul 1, 2014 at 6:08 PM, MEETHU MATHEW meethu2...@yahoo.co.in
 wrote:


  Hi ,

 I am using Spark Standalone mode with one master and 2 slaves.I am not
  able to start the workers and connect it to the master using

 ./bin/spark-class org.apache.spark.deploy.worker.Worker
 spark://x.x.x.174:7077

 The log says

  Exception in thread main org.jboss.netty.channel.ChannelException:
 Failed to bind to: master/x.x.x.174:0
  at
 org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
  ...
  Caused by: java.net.BindException: Cannot assign requested address

 When I try to start the worker from the slaves using the following java
 command,its running without any exception

 java -cp
 ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m
 org.apache.spark.deploy.worker.Worker spark://:master:7077




 Thanks  Regards,
 Meethu M







Re: org.jboss.netty.channel.ChannelException: Failed to bind to: master/1xx.xx..xx:0

2014-07-01 Thread Aaron Davidson
In your spark-env.sh, do you happen to set SPARK_PUBLIC_DNS or something of
that kin? This error suggests the worker is trying to bind a server on the
master's IP, which clearly doesn't make sense



On Mon, Jun 30, 2014 at 11:59 PM, MEETHU MATHEW meethu2...@yahoo.co.in
wrote:

 Hi,

 I did netstat -na | grep 192.168.125.174 and its showing
 192.168.125.174:7077 LISTEN(after starting master)

 I tried to execute the following script from the slaves manually but it
 ends up with the same exception and log.This script is internally executing
 the java command.
  /usr/local/spark-1.0.0/sbin/start-slave.sh 1 spark://192.168.125.174:7077
 In this case netstat is showing any connection established to master:7077.

 When we manually execute the java command,the connection is getting
 established to master.

 Thanks  Regards,
 Meethu M


   On Monday, 30 June 2014 6:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


  Are you sure you have this ip 192.168.125.174 http://192.168.125.174:0/ 
 bind
 for that machine? (netstat -na | grep 192.168.125.174
 http://192.168.125.174:0/)

 Thanks
 Best Regards


 On Mon, Jun 30, 2014 at 5:34 PM, MEETHU MATHEW meethu2...@yahoo.co.in
 wrote:

 Hi all,

 I reinstalled spark,reboot the system,but still I am not able to start the
 workers.Its throwing the following exception:

 Exception in thread main org.jboss.netty.channel.ChannelException:
 Failed to bind to: master/192.168.125.174:0

 I doubt the problem is with 192.168.125.174:0. Eventhough the command
 contains master:7077,why its showing 0 in the log.

 java -cp
 ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m
 org.apache.spark.deploy.worker.Worker spark://master:7077

 Can somebody tell me  a solution.

 Thanks  Regards,
 Meethu M


   On Friday, 27 June 2014 4:28 PM, MEETHU MATHEW meethu2...@yahoo.co.in
 wrote:


  Hi,
 ya I tried setting another PORT also,but the same problem..
 master is set in etc/hosts

 Thanks  Regards,
 Meethu M


   On Friday, 27 June 2014 3:23 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


 tha's strange, did you try setting the master port to something else (use
 SPARK_MASTER_PORT).

 Also you said you are able to start it from the java commandline

 java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/
 assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m
 -Xmx512m org.apache.spark.deploy.worker.Worker spark://:*master*:7077

 What is the master ip specified here? is it like you have entry for
 *master* in the /etc/hosts?

 Thanks
 Best Regards


 On Fri, Jun 27, 2014 at 3:09 PM, MEETHU MATHEW meethu2...@yahoo.co.in
 wrote:

 Hi Akhil,

 I am running it in a LAN itself..The IP of the master is given correctly.

 Thanks  Regards,
 Meethu M


   On Friday, 27 June 2014 2:51 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


  why is it binding to port 0? 192.168.125.174:0 :/

 Check the ip address of that master machine (ifconfig) looks like the ip
 address has been changed (hoping you are running this machines on a LAN)

 Thanks
 Best Regards


 On Fri, Jun 27, 2014 at 12:00 PM, MEETHU MATHEW meethu2...@yahoo.co.in
 wrote:

 Hi all,

 My Spark(Standalone mode) was running fine till yesterday.But now I am
 getting  the following exeception when I am running start-slaves.sh or
 start-all.sh

 slave3: failed to launch org.apache.spark.deploy.worker.Worker:
 slave3:   at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
 slave3:   at java.lang.Thread.run(Thread.java:662)

 The log files has the following lines.

 14/06/27 11:06:30 INFO SecurityManager: Using Spark's default log4j
 profile: org/apache/spark/log4j-defaults.properties
 14/06/27 11:06:30 INFO SecurityManager: Changing view acls to: hduser
 14/06/27 11:06:30 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(hduser)
 14/06/27 11:06:30 INFO Slf4jLogger: Slf4jLogger started
 14/06/27 11:06:30 INFO Remoting: Starting remoting
 Exception in thread main org.jboss.netty.channel.ChannelException:
 Failed to bind to: master/192.168.125.174:0
  at
 org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
 ...
 Caused by: java.net.BindException: Cannot assign requested address
  ...
 I saw the same error reported before and have tried the following
 solutions.

 Set the variable SPARK_LOCAL_IP ,Changed the SPARK_MASTER_PORT to a
 different number..But nothing is working.

 When I try to start the worker from the respective machines using the
 following java command,its running without any exception

 java -cp
 ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m 

Re: Interconnect benchmarking

2014-06-27 Thread Aaron Davidson
A simple throughput test is also repartition()ing a large RDD. This also
stresses the disks, though, so you might try to mount your spark temporary
directory as a ramfs.


On Fri, Jun 27, 2014 at 5:57 PM, danilopds danilob...@gmail.com wrote:

 Hi,
 According with the research paper bellow of Mathei Zaharia, Spark's
 creator,
 http://people.csail.mit.edu/matei/papers/2013/sosp_spark_streaming.pdf

 He says on page 10 that:
 Grep is network-bound due to the cost to replicate the input data to
 multiple nodes.

 So,
 I guess a can be a good initial recommendation.

 But I would like to know others workloads too.
 Best Regards.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Interconnect-benchmarking-tp8467p8470.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Improving Spark multithreaded performance?

2014-06-26 Thread Aaron Davidson
I don't have specific solutions for you, but the general things to try are:

- Decrease task size by broadcasting any non-trivial objects.
- Increase duration of tasks by making them less fine-grained.

How many tasks are you sending? I've seen in the past something like 25
seconds for ~10k total medium-sized tasks.


On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu
wrote:

 I'm working to set up a calculation that involves calling
 mllib's SVMWithSGD.train several thousand times on different permutations
 of the data. I'm trying to run the separate jobs using a threadpool to
 dispatch the different requests to a spark context connected a Mesos's
 cluster, using course scheduling, and a max of 2000 cores on Spark 1.0.
 Total utilization of the system is terrible. Most of the 'aggregate at
 GradientDescent.scala:178' stages(where mllib spends most of its time) take
 about 3 seconds, but have ~25 seconds of scheduler delay time.
 What kind of things can I do to improve this?

 Kyle



Re: jsonFile function in SQLContext does not work

2014-06-25 Thread Aaron Davidson
Is it possible you have blank lines in your input? Not that this should be
an error condition, but it may be what's causing it.


On Wed, Jun 25, 2014 at 11:57 AM, durin m...@simon-schaefer.net wrote:

 Hi Zongheng Yang,

 thanks for your response. Reading your answer, I did some more tests and
 realized that analyzing very small parts of the dataset (which is ~130GB in
 ~4.3M lines) works fine.
 The error occurs when I analyze larger parts. Using 5% of the whole data,
 the error is the same as posted before for certain TIDs. However, I get the
 structure determined so far as a result when using 5%.

 The Spark WebUI shows the following:

 Job aborted due to stage failure: Task 6.0:11 failed 4 times, most recent
 failure: Exception failure in TID 108 on host foo.bar.com:
 com.fasterxml.jackson.databind.JsonMappingException: No content to map due
 to end-of-input at [Source: java.io.StringReader@3697781f; line: 1,
 column:
 1]

 com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164)

 com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3029)

 com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971)

 com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091)

 org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261)

 org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
 scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
 org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:823)
 org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:821)
 org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132)
 org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112)
 org.apache.spark.scheduler.Task.run(Task.scala:51)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 java.lang.Thread.run(Thread.java:662) Driver stacktrace:



 Is the only possible reason that some of these 4.3 Million JSON-Objects are
 not valid JSON, or could there be another explanation?
 And if it is the reason, is there some way to tell the function to just
 skip
 faulty lines?


 Thanks,
 Durin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273p8278.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Changing log level of spark

2014-06-25 Thread Aaron Davidson
If you're using the spark-ec2 scripts, you may have to change
/root/ephemeral-hdfs/conf/log4j.properties or something like that, as that
is added to the classpath before Spark's own conf.


On Wed, Jun 25, 2014 at 6:10 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 I have a log4j.xml in src/main/resources with

 ?xml version=1.0 encoding=UTF-8 ?
 !DOCTYPE log4j:configuration SYSTEM log4j.dtd
 log4j:configuration xmlns:log4j=http://jakarta.apache.org/log4j/;
 [...]
 root
 priority value =warn /
 appender-ref ref=Console /
 /root
 /log4j:configuration

 and that is included in the jar I package with `sbt assembly`. That
 works fine for me, at least on the driver.

 Tobias

 On Wed, Jun 25, 2014 at 2:25 PM, Philip Limbeck philiplimb...@gmail.com
 wrote:
  Hi!
 
  According to
 
 https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging
 ,
  changing log-level is just a matter of creating a log4j.properties
 (which is
  in the classpath of spark) and changing log level there for the root
 logger.
  I did this steps on every node in the cluster (master and worker nodes).
  However, after restart there is still no debug output as desired, but
 only
  the default info log level.



Re: Shark vs Impala

2014-06-23 Thread Aaron Davidson
Note that regarding a long load time, data format means a whole lot in
terms of query performance. If you load all your data into compressed,
columnar Parquet files on local hardware, Spark SQL would also perform far,
far better than it would reading from gzipped S3 files. You must also be
careful about your queries; certain queries can be answered much more
efficiently due to specific optimizations implemented in the query engine.
For instance, Parquet keeps statistics. so you could theoretically do a
count(*) over petabytes of data in less than a second, blowing away any
competition that resorts to actually reading data.



On Sun, Jun 22, 2014 at 6:24 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 In this benchmark, the problem wasn’t that Shark could not run without
 enough memory; Shark spills some of the data to disk and can run just fine.
 The issue was that the in-memory form of the RDDs was larger than the
 cluster’s memory, although the raw Parquet / ORC files did fit in memory,
 so Cloudera did not want to run an “RDD” number where some of the RDD is
 not in memory. But the wording “could not complete” is confusing — the
 queries complete just fine.

 We do plan to update the AMPLab benchmark with Spark SQL as well, and
 expand it to include more of TPC-DS.

 Matei

 On Jun 22, 2014, at 9:53 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 600s for Spark vs 5s for Redshift...The numbers look much different from
 the amplab benchmark...

 https://amplab.cs.berkeley.edu/benchmark/

 Is it like SSDs or something that's helping redshift or the whole data is
 in memory when you run the query ? Could you publish the query ?

 Also after spark-sql are we planning to add spark-sql runtimes in the
 amplab benchmark as well ?



 On Sun, Jun 22, 2014 at 9:13 AM, Toby Douglass t...@avocet.io wrote:

 I've just benchmarked Spark and Impala.  Same data (in s3), same query,
 same cluster.

 Impala has a long load time, since it cannot load directly from s3.  I
 have to create a Hive table on s3, then insert from that to an Impala
 table.  This takes a long time; Spark took about 600s for the query, Impala
 250s, but Impala required 6k seconds to load data from s3.  If you're going
 to go the long-initial-load-then-quick-queries route, go for Redshift.  On
 equivalent hardware, that took about 4k seconds to load, but then queries
 are like 5s each.






Re: DAGScheduler: Failed to run foreach

2014-06-23 Thread Aaron Davidson
Please note that this:

for (sentence - sourcerdd) {
   ...
}

is actually Scala syntactic sugar which is converted into

sourcerdd.foreach { sentence = ... }

What this means is that this will actually run on the cluster, which is
probably not what you want if you're trying to print them.

Try this instead:

for (sentence - sourcerdd.toLocalIterator) {
  ...
}

(By the way, the reason this was throwing a NotSerializableException was
because you were trying to pass printScoreCanndedString as part of the
job's closure. In Java, class methods have an implicit reference to this,
so it tried to serialize the class CalculateScore, which is presumably not
marked as Serializable.)


On Mon, Jun 23, 2014 at 5:45 PM, Sameer Tilak ssti...@live.com wrote:

 The subject should be: org.apache.spark.SparkException: Job aborted due
 to stage failure: Task not serializable: java.io.NotSerializableException:
  and not DAGScheduler: Failed to run foreach

 If I call printScoreCanndedString with a hard-coded string and identical
 2nd parameter, it works fine. However for my application that is not
 sufficient.
 --
 From: ssti...@live.com
 To: u...@spark.incubator.apache.org
 Subject: DAGScheduler: Failed to run foreach
 Date: Mon, 23 Jun 2014 17:05:03 -0700


 Hi All,

 I am using spark for text analysis. I have a source file that has few
 thousand sentences and a dataset of tens of millions of statements. I want
 to compare each statement from the sourceFile with each statement from the
 dataset and generate a score. I am having following problem. I would really
 appreciate help.

 Here is what I do within spark-shell

 // Source file with few thousand sentences
  val srcFile = sc.textFile(hdfs://serverip/data/dummy/src.txt);

 // Dataset with tens of millions of statements.

  val destFile = sc.textFile(hdfs://serverip/data/dummy/sample.txt);

 // Initialize the score variable.

  val score = new mypackage.Score()

 // Generate score.

 score.calculateScore(srcFile, destFile);

 Here is my snippet from my scala class (Score.scala)


   def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String])  {
 for (sentence - sourcerdd)
{
   println(Source String is:  + sentence + Data Type is:  +
 sentence.getClass.getSimpleName)

   printScoreCanndedString(sentence, destrdd);
}

   }
 def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) :
 RDD[Double] = {
 // Do the analysis here.
 }

 The print statement displays the data correctly along with data type as
 String as expected. However, I get the following error message when it
 tries to execute  printScoreCanndedString method. Any help with this will
 be great.


 14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded
 14/06/23 16:45:04 INFO FileInputFormat: Total input paths to process : 1
 14/06/23 16:45:04 INFO SparkContext: Starting job: foreach at
 calculateScore.scala:51
 14/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at
 CalculateScore.scala:51) with 2 output partitions (allowLocal=false)
 14/06/23 16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach
 at CalculateScore.scala:51)
 14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List()
 14/06/23 16:45:04 INFO DAGScheduler: Missing parents: List()
 14/06/23 16:45:04 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at
 textFile at console:12), which has no missing parents
 14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach
 at CalculateScore.scala:51
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 not serializable: java.io.NotSerializableException: approxstrmatch.
 CalculateScore
  at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
  at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
  at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
  at
 org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at 

Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Aaron Davidson
I remember having to do a similar thing in the spark docker scripts for
testing purposes. Were you able to modify the /etc/hosts directly? I
remember issues with that as docker apparently mounts it as part of its
read-only filesystem.


On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 It was a DNS issue. AKKA apparently uses the hostname of the endpoints and
 hence they need to be resolvable. In my case the hostname of the docker
 container was a randomly generated string and was not resolvable. I added a
 workaround (entry in etc/hosts file of spark master) for now. If anyone can
 point to a more elegant solution, that would be awesome!


 On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 I am using cutting edge code from git but doing my own sbt assembly.


 On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher 
 schum...@icsi.berkeley.edu wrote:


 Hi,

 are you using the amplab/spark-1.0.0 images from the global registry?

 Andre

 On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
  Hi Folks,
 
  I am having trouble getting spark driver running in docker. If I run a
  pyspark example on my mac it works but the same example on a docker
 image
  (Via boot2docker) fails with following logs. I am pointing the spark
 driver
  (which is running the example) to a spark cluster (driver is not part
 of
  the cluster). I guess this has something to do with docker's networking
  stack (it may be getting NAT'd) but I am not sure why (if at all) the
  spark-worker or spark-master is trying to create a new TCP connection
 to
  the driver, instead of responding on the connection initiated by the
 driver.
 
  I would appreciate any help in figuring this out.
 
  Thanks,
 
  Mohit.
 
  logs
 
  Spark Executor Command: java -cp
 
 ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar
  -Xms2g -Xmx2g -Xms512M -Xmx512M
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1
  cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker
  app-20140616152201-0021
 
  
 
 
  log4j:WARN No appenders could be found for logger
  (org.apache.hadoop.conf.Configuration).
 
  log4j:WARN Please initialize the log4j system properly.
 
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
 for
  more info.
 
  14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
  profile: org/apache/spark/log4j-defaults.properties
 
  14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
 ayasdi,root
 
  14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication
  disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)
 
  14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
 
  14/06/16 15:22:05 INFO Remoting: Starting remoting
 
  14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
 addresses
  :[akka.tcp://sparkExecutor@:33536]
 
  14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
  [akka.tcp://sparkExecutor@:33536]
 
  14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
 driver:
  akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
 
  14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
  akka.tcp://sparkWorker@:33952/user/Worker
 
  14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
 remote
  address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated
 for
  6 ms, all messages to this address will be delivered to dead
 letters.
 
  14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated
  [akka.tcp://sparkExecutor@:33536] -
 [akka.tcp://spark@fc31887475e3:43921]
  disassociated! Shutting down.
 






Re: Executors not utilized properly.

2014-06-17 Thread Aaron Davidson
repartition() is actually just an alias of coalesce(), but which the
shuffle flag to set to true. This shuffle is probably what you're seeing as
taking longer, but it is required when you go from a smaller number of
partitions to a larger.

When actually decreasing the number of partitions, coalesce(shuffle =
false) will be fully pipelined, but is limited in how it can redistribute
data, as it can only combine whole partitions into larger partitions. For
example, if you have an rdd with 101 partitions, and you do
rdd.coalesce(100, shuffle = false), then the resultant rdd will have 99 of
the original partitions, and 1 partition will just be 2 original partitions
combined. This can lead to increased data skew, but requires no effort to
create.

On the other hand, if you do rdd.coalesce(100, shuffle = true), then all of
the data will actually be reshuffled into 100 new evenly-sized partitions,
eliminating any data skew at the cost of actually moving all data around.


On Tue, Jun 17, 2014 at 4:52 PM, abhiguruvayya sharath.abhis...@gmail.com
wrote:

 I found the main reason to be that i was using coalesce instead of
 repartition. coalesce was shrinking the portioning so the number of tasks
 were very less to be executed by all of the executors. Can you help me in
 understudying when to use coalesce and when to use repartition. In
 application coalesce is being processed faster then repartition. Which is
 unusual.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7787.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Aaron Davidson
Yup, alright, same solution then :)


On Tue, Jun 17, 2014 at 7:39 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 I used --privileged to start the container and then unmounted /etc/hosts.
 Then I created a new /etc/hosts file


 On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 I remember having to do a similar thing in the spark docker scripts for
 testing purposes. Were you able to modify the /etc/hosts directly? I
 remember issues with that as docker apparently mounts it as part of its
 read-only filesystem.


 On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 It was a DNS issue. AKKA apparently uses the hostname of the endpoints
 and hence they need to be resolvable. In my case the hostname of the docker
 container was a randomly generated string and was not resolvable. I added a
 workaround (entry in etc/hosts file of spark master) for now. If anyone can
 point to a more elegant solution, that would be awesome!


 On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 I am using cutting edge code from git but doing my own sbt assembly.


 On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher 
 schum...@icsi.berkeley.edu wrote:


 Hi,

 are you using the amplab/spark-1.0.0 images from the global registry?

 Andre

 On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
  Hi Folks,
 
  I am having trouble getting spark driver running in docker. If I run
 a
  pyspark example on my mac it works but the same example on a docker
 image
  (Via boot2docker) fails with following logs. I am pointing the spark
 driver
  (which is running the example) to a spark cluster (driver is not
 part of
  the cluster). I guess this has something to do with docker's
 networking
  stack (it may be getting NAT'd) but I am not sure why (if at all) the
  spark-worker or spark-master is trying to create a new TCP
 connection to
  the driver, instead of responding on the connection initiated by the
 driver.
 
  I would appreciate any help in figuring this out.
 
  Thanks,
 
  Mohit.
 
  logs
 
  Spark Executor Command: java -cp
 
 ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar
  -Xms2g -Xmx2g -Xms512M -Xmx512M
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
 1
  cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker
  app-20140616152201-0021
 
  
 
 
  log4j:WARN No appenders could be found for logger
  (org.apache.hadoop.conf.Configuration).
 
  log4j:WARN Please initialize the log4j system properly.
 
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
 for
  more info.
 
  14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
  profile: org/apache/spark/log4j-defaults.properties
 
  14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
 ayasdi,root
 
  14/06/16 15:22:05 INFO SecurityManager: SecurityManager:
 authentication
  disabled; ui acls disabled; users with view permissions: Set(xxx,
 xxx)
 
  14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
 
  14/06/16 15:22:05 INFO Remoting: Starting remoting
 
  14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
 addresses
  :[akka.tcp://sparkExecutor@:33536]
 
  14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
  [akka.tcp://sparkExecutor@:33536]
 
  14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
 driver:
  akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
 
  14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
  akka.tcp://sparkWorker@:33952/user/Worker
 
  14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
 remote
  address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated
 for
  6 ms, all messages to this address will be delivered to dead
 letters.
 
  14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated
  [akka.tcp://sparkExecutor@:33536] -
 [akka.tcp://spark@fc31887475e3:43921]
  disassociated! Shutting down.
 








Re: long GC pause during file.cache()

2014-06-15 Thread Aaron Davidson
Note also that Java does not work well with very large JVMs due to this
exact issue. There are two commonly used workarounds:

1) Spawn multiple (smaller) executors on the same machine. This can be done
by creating multiple Workers (via SPARK_WORKER_INSTANCES in standalone
mode[1]).
2) Use Tachyon for off-heap caching of RDDs, allowing Spark executors to be
smaller and avoid GC pauses

[1] See standalone documentation here:
http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts


On Sun, Jun 15, 2014 at 3:50 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  Yes, I think in the spark-env.sh.template, it is listed in the comments
 (didn’t check….)

 Best,

 --
 Nan Zhu

 On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote:

 Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0?



 On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you
 don’t mind the WARNING in the logs

 you can set spark.executor.extraJavaOpts in your SparkConf obj

 Best,

 --
 Nan Zhu

 On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:

 Hi, Wei

 You may try to set JVM opts in *spark-env.sh http://spark-env.sh* as
 follow to prevent or mitigate GC pause:

 export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC
 -Xmx2g -XX:MaxPermSize=256m

 There are more options you could add, please just Google :)


 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com


 On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote:

 Hi,

   I have a single node (192G RAM) stand-alone spark, with memory
 configuration like this in spark-env.sh

 SPARK_WORKER_MEMORY=180g
 SPARK_MEM=180g


  In spark-shell I have a program like this:

 val file = sc.textFile(/localpath) //file size is 40G
 file.cache()


 val output = file.map(line = extract something from line)

 output.saveAsTextFile (...)


 When I run this program again and again, or keep trying file.unpersist()
 -- file.cache() -- output.saveAsTextFile(), the run time varies a lot,
 from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min,
 from the stage monitoring GUI I observe big GC pause (some can be 10+ min).
 Of course when run-time is normal, say ~1 min, no significant GC is
 observed. The behavior seems somewhat random.

 Is there any JVM tuning I should do to prevent this long GC pause from
 happening?



 I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something
 like this:

 root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12
 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp
 ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
 -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g
 org.apache.spark.deploy.SparkSubmit spark-shell --class
 org.apache.spark.repl.Main

 Best regards,
 Wei

 -
 Wei Tan, PhD
 Research Staff Member
 IBM T. J. Watson Research Center
 *http://researcher.ibm.com/person/us-wtan*
 http://researcher.ibm.com/person/us-wtan






 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





Re: How to specify executor memory in EC2 ?

2014-06-12 Thread Aaron Davidson
The scripts for Spark 1.0 actually specify this property in
/root/spark/conf/spark-defaults.conf

I didn't know that this would override the --executor-memory flag, though,
that's pretty odd.


On Thu, Jun 12, 2014 at 6:02 PM, Aliaksei Litouka 
aliaksei.lito...@gmail.com wrote:

 Yes, I am launching a cluster with the spark_ec2 script. I checked
 /root/spark/conf/spark-env.sh on the master node and on slaves and it looks
 like this:

 #!/usr/bin/env bash
 export SPARK_LOCAL_DIRS=/mnt/spark
 # Standalone cluster options
 export SPARK_MASTER_OPTS=
 export SPARK_WORKER_INSTANCES=1
 export SPARK_WORKER_CORES=1
 export HADOOP_HOME=/root/ephemeral-hdfs
 export SPARK_MASTER_IP=ec2-54-89-95-238.compute-1.amazonaws.com
 export MASTER=`cat /root/spark-ec2/cluster-url`
 export
 SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/
 export
 SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf
 # Bind Spark's web UIs to this machine's public EC2 hostname:
 export SPARK_PUBLIC_DNS=`wget -q -O -
 http://169.254.169.254/latest/meta-data/public-hostname`
 http://169.254.169.254/latest/meta-data/public-hostname
 # Set a high ulimit for large shuffles
 ulimit -n 100


 None of these variables seem to be related to memory size. Let me know if
 I am missing something.


 On Thu, Jun 12, 2014 at 7:17 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Are you launching this using our EC2 scripts? Or have you set up a
 cluster by hand?

 Matei

 On Jun 12, 2014, at 2:32 PM, Aliaksei Litouka aliaksei.lito...@gmail.com
 wrote:

 spark-env.sh doesn't seem to contain any settings related to memory size
 :( I will continue searching for a solution and will post it if I find it :)
 Thank you, anyway


 On Wed, Jun 11, 2014 at 12:19 AM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 It might be that conf/spark-env.sh on EC2 is configured to set it to
 512, and is overriding the application’s settings. Take a look in there and
 delete that line if possible.

 Matei

 On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka 
 aliaksei.lito...@gmail.com wrote:

  I am testing my application in EC2 cluster of m3.medium machines. By
 default, only 512 MB of memory on each machine is used. I want to increase
 this amount and I'm trying to do it by passing --executor-memory 2G option
 to the spark-submit script, but it doesn't seem to work - each machine uses
 only 512 MB instead of 2 gigabytes. What am I doing wrong? How do I
 increase the amount of memory?







Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-09 Thread Aaron Davidson
It is not a very good idea to save the results in the exact same place as
the data. Any failures during the job could lead to corrupted data, because
recomputing the lost partitions would involve reading the original
(now-nonexistent) data.

As such, the only safe way to do this would be to do as you said, and
only delete the input data once the entire output has been successfully
created.


On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

 Without (C), what is the best practice to implement the following scenario?

 1. rdd = sc.textFile(FileA)
 2. rdd = rdd.map(...)  // actually modifying the rdd
 3. rdd.saveAsTextFile(FileA)

 Since the rdd transformation is 'lazy', rdd will not materialize until
 saveAsTextFile(), so FileA must still exist, but it must be deleted before
 saveAsTextFile().

 What I can think is:

 3. rdd.saveAsTextFile(TempFile)
 4. delete FileA
 5. rename TempFile to FileA

 This is not very convenient...

 Thanks.

 -Original Message-
 From: Patrick Wendell [mailto:pwend...@gmail.com]
 Sent: Tuesday, June 03, 2014 11:40 AM
 To: user@spark.apache.org
 Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing
 file

 (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's output
 format check and overwrite files in the destination directory.
 But it won't clobber the directory entirely. I.e. if the directory already
 had part1 part2 part3 part4 and you write a new job outputing only
 two files (part1, part2) then it would leave the other two files
 intact,
 confusingly.

 (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check
 which
 means the directory must not exist already or an excpetion is thrown.

 (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK):
 Spark will delete/clobber an existing destination directory if it exists,
 then fully over-write it with new data.

 I'm fine to add a flag that allows (B) for backwards-compatibility reasons,
 but my point was I'd prefer not to have (C) even though I see some cases
 where it would be useful.

 - Patrick

 On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com wrote:
  Is there a third way? Unless I miss something. Hadoop's OutputFormat
  wants the target dir to not exist no matter what, so it's just a
  question of whether Spark deletes it for you or errors.
 
  On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell pwend...@gmail.com
 wrote:
  We can just add back a flag to make it backwards compatible - it was
  just missed during the original PR.
 
  Adding a *third* set of clobber semantics, I'm slightly -1 on that
  for the following reasons:
 
  1. It's scary to have Spark recursively deleting user files, could
  easily lead to users deleting data by mistake if they don't
  understand the exact semantics.
  2. It would introduce a third set of semantics here for saveAsXX...
  3. It's trivial for users to implement this with two lines of code
  (if output dir exists, delete it) before calling saveAsHadoopFile.
 
  - Patrick
 




Re: How to enable fault-tolerance?

2014-06-09 Thread Aaron Davidson
Looks like your problem is local mode:
https://github.com/apache/spark/blob/640f9a0efefd42cff86aecd4878a3a57f5ae85fa/core/src/main/scala/org/apache/spark/SparkContext.scala#L1430

For some reason, someone decided not to do retries when running in local
mode. Not exactly sure why, feel free to submit a JIRA on this.


On Mon, Jun 9, 2014 at 8:59 AM, Peng Cheng pc...@uow.edu.au wrote:

 I speculate that Spark will only retry on exceptions that are registered
 with
 TaskSetScheduler, so a definitely-will-fail task will fail quickly without
 taking more resources. However I haven't found any documentation or web
 page
 on it



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-fault-tolerance-tp7250p7255.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Aaron Davidson
+1 please re-add this feature


On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell pwend...@gmail.com wrote:

 Thanks for pointing that out. I've assigned you to SPARK-1677 (I think
 I accidentally assigned myself way back when I created it). This
 should be an easy fix.

 On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu zhunanmcg...@gmail.com wrote:
  Hi, Patrick,
 
  I think https://issues.apache.org/jira/browse/SPARK-1677 is talking
 about
  the same thing?
 
  How about assigning it to me?
 
  I think I missed the configuration part in my previous commit, though I
  declared that in the PR description
 
  Best,
 
  --
  Nan Zhu
 
  On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
 
  Hey There,
 
  The issue was that the old behavior could cause users to silently
  overwrite data, which is pretty bad, so to be conservative we decided
  to enforce the same checks that Hadoop does.
 
  This was documented by this JIRA:
  https://issues.apache.org/jira/browse/SPARK-1100
 
 https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
 
  However, it would be very easy to add an option that allows preserving
  the old behavior. Is anyone here interested in contributing that? I
  created a JIRA for it:
 
  https://issues.apache.org/jira/browse/SPARK-1993
 
  - Patrick
 
  On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
  pierre.borckm...@realimpactanalytics.com wrote:
 
  Indeed, the behavior has changed for good or for bad. I mean, I agree
 with
  the danger you mention but I'm not sure it's happening like that. Isn't
  there a mechanism for overwrite in Hadoop that automatically removes part
  files, then writes a _temporary folder and then only the part files along
  with the _success folder.
 
  In any case this change of behavior should be documented IMO.
 
  Cheers
  Pierre
 
  Message sent from a mobile device - excuse typos and abbreviations
 
  Le 2 juin 2014 à 17:42, Nicholas Chammas nicholas.cham...@gmail.com a
  écrit :
 
  What I've found using saveAsTextFile() against S3 (prior to Spark
 1.0.0.) is
  that files get overwritten automatically. This is one danger to this
 though.
  If I save to a directory that already has 20 part- files, but this time
  around I'm only saving 15 part- files, then there will be 5 leftover
 part-
  files from the previous set mixed in with the 15 newer files. This is
  potentially dangerous.
 
  I haven't checked to see if this behavior has changed in 1.0.0. Are you
  saying it has, Pierre?
 
  On Mon, Jun 2, 2014 at 9:41 AM, Pierre B
  [pierre.borckm...@realimpactanalytics.com](mailto:
 pierre.borckm...@realimpactanalytics.com)
  wrote:
 
 
  Hi Michaël,
 
  Thanks for this. We could indeed do that.
 
  But I guess the question is more about the change of behaviour from 0.9.1
  to
  1.0.0.
  We never had to care about that in previous versions.
 
  Does that mean we have to manually remove existing files or is there a
 way
  to aumotically overwrite when using saveAsTextFile?
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 



Re: hadoopRDD stalls reading entire directory

2014-06-02 Thread Aaron Davidson
You may have to do sudo jps, because it should definitely list your
processes.

What does hivecluster2:8080 look like? My guess is it says there are 2
applications registered, and one has taken all the executors. There must be
two applications running, as those are the only things that keep open those
4040/4041 ports.


On Mon, Jun 2, 2014 at 11:32 AM, Russell Jurney russell.jur...@gmail.com
wrote:

 If it matters, I have servers running at
 http://hivecluster2:4040/stages/ and http://hivecluster2:4041/stages/

 When I run rdd.first, I see an item at
 http://hivecluster2:4041/stages/ but no tasks are running. Stage ID 1,
 first at console:46, Tasks: Succeeded/Total 0/16.

 On Mon, Jun 2, 2014 at 10:09 AM, Russell Jurney
 russell.jur...@gmail.com wrote:
  Looks like just worker and master processes are running:
 
  [hivedata@hivecluster2 ~]$ jps
 
  10425 Jps
 
  [hivedata@hivecluster2 ~]$ ps aux|grep spark
 
  hivedata 10424  0.0  0.0 103248   820 pts/3S+   10:05   0:00 grep
 spark
 
  root 10918  0.5  1.4 4752880 230512 ?  Sl   May27  41:43 java -cp
 
 :/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/conf:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/core/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/repl/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/examples/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/bagel/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/mllib/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/streaming/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-hdfs/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-yarn/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-mapreduce/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/scala-library.jar:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/scala-compiler.jar:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/jline.jar
  -Dspark.akka.logLifecycleEvents=true
 
 -Djava.library.path=/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
  -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip hivecluster2
  --port 7077 --webui-port 18080
 
  root 12715  0.0  0.0 148028   656 ?SMay27   0:00 sudo
  /opt/cloudera/parcels/SPARK/lib/spark/bin/spark-class
  org.apache.spark.deploy.worker.Worker spark://hivecluster2:7077
 
  root 12716  0.3  1.1 4155884 191340 ?  Sl   May27  30:21 java -cp
 
 :/opt/cloudera/parcels/SPARK/lib/spark/conf:/opt/cloudera/parcels/SPARK/lib/spark/core/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/repl/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/examples/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/bagel/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/mllib/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/streaming/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-hdfs/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-yarn/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-mapreduce/*:/opt/cloudera/parcels/SPARK/lib/spark/lib/scala-library.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/scala-compiler.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/jline.jar
  -Dspark.akka.logLifecycleEvents=true
 
 -Djava.library.path=/opt/cloudera/parcels/SPARK/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
  -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker
  spark://hivecluster2:7077
 
 
 
 
  On Sun, Jun 1, 2014 at 7:41 PM, Aaron Davidson ilike...@gmail.com
 wrote:
 
  Sounds like you have two shells running, and the first one is talking
 all
  your resources. Do a jps and kill the other guy, then try again.
 
  By the way, you can look at http://localhost:8080 (replace localhost
 with
  the server your Spark Master is running on) to see what applications are
  currently started, and what resource allocations they have.
 
 
  On Sun, Jun 1, 2014 at 6:47 PM, Russell Jurney 
 russell.jur...@gmail.com
  wrote:
 
  Thanks again. Run results here:
  https://gist.github.com/rjurney/dc0efae486ba7d55b7d5
 
  This time I get a port already in use exception on 4040, but it isn't
  fatal. Then when I run rdd.first, I get this over and over:
 
  14/06/01 18:35:40 WARN scheduler.TaskSchedulerImpl: Initial job has not
  accepted any resources; check your cluster UI to ensure that workers
 are
  registered and have sufficient memory
 
 
 
 
 
 
 
  On Sun, Jun 1, 2014 at 3:09 PM, Aaron Davidson ilike...@gmail.com
  wrote:
 
  You can avoid that by using the constructor that takes a SparkConf, a
 la
 
  val conf = new SparkConf()
  conf.setJars(avro.jar, ...)
  val sc = new SparkContext(conf)
 
 
  On Sun, Jun 1, 2014 at 2:32 PM, Russell Jurney
  russell.jur

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Aaron Davidson
Yes.


On Mon, Jun 2, 2014 at 1:23 PM, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

 So in summary:

- As of Spark 1.0.0, saveAsTextFile() will no longer clobber by
default.
- There is an open JIRA issue to add an option to allow clobbering.
- Even when clobbering, part- files may be left over from previous
saves, which is dangerous.

 Is this correct?


 On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson ilike...@gmail.com wrote:

 +1 please re-add this feature


 On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 Thanks for pointing that out. I've assigned you to SPARK-1677 (I think
 I accidentally assigned myself way back when I created it). This
 should be an easy fix.

 On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu zhunanmcg...@gmail.com wrote:
  Hi, Patrick,
 
  I think https://issues.apache.org/jira/browse/SPARK-1677 is talking
 about
  the same thing?
 
  How about assigning it to me?
 
  I think I missed the configuration part in my previous commit, though I
  declared that in the PR description
 
  Best,
 
  --
  Nan Zhu
 
  On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
 
  Hey There,
 
  The issue was that the old behavior could cause users to silently
  overwrite data, which is pretty bad, so to be conservative we decided
  to enforce the same checks that Hadoop does.
 
  This was documented by this JIRA:
  https://issues.apache.org/jira/browse/SPARK-1100
 
 https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
 
  However, it would be very easy to add an option that allows preserving
  the old behavior. Is anyone here interested in contributing that? I
  created a JIRA for it:
 
  https://issues.apache.org/jira/browse/SPARK-1993
 
  - Patrick
 
  On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
  pierre.borckm...@realimpactanalytics.com wrote:
 
  Indeed, the behavior has changed for good or for bad. I mean, I agree
 with
  the danger you mention but I'm not sure it's happening like that. Isn't
  there a mechanism for overwrite in Hadoop that automatically removes
 part
  files, then writes a _temporary folder and then only the part files
 along
  with the _success folder.
 
  In any case this change of behavior should be documented IMO.
 
  Cheers
  Pierre
 
  Message sent from a mobile device - excuse typos and abbreviations
 
  Le 2 juin 2014 à 17:42, Nicholas Chammas nicholas.cham...@gmail.com
 a
  écrit :
 
  What I've found using saveAsTextFile() against S3 (prior to Spark
 1.0.0.) is
  that files get overwritten automatically. This is one danger to this
 though.
  If I save to a directory that already has 20 part- files, but this time
  around I'm only saving 15 part- files, then there will be 5 leftover
 part-
  files from the previous set mixed in with the 15 newer files. This is
  potentially dangerous.
 
  I haven't checked to see if this behavior has changed in 1.0.0. Are you
  saying it has, Pierre?
 
  On Mon, Jun 2, 2014 at 9:41 AM, Pierre B
  [pierre.borckm...@realimpactanalytics.com](mailto:
 pierre.borckm...@realimpactanalytics.com)
  wrote:
 
 
  Hi Michaël,
 
  Thanks for this. We could indeed do that.
 
  But I guess the question is more about the change of behaviour from
 0.9.1
  to
  1.0.0.
  We never had to care about that in previous versions.
 
  Does that mean we have to manually remove existing files or is there a
 way
  to aumotically overwrite when using saveAsTextFile?
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
 






Re: Using Spark on Data size larger than Memory size

2014-06-01 Thread Aaron Davidson
There is no fundamental issue if you're running on data that is larger than
cluster memory size. Many operations can stream data through, and thus
memory usage is independent of input data size. Certain operations require
an entire *partition* (not dataset) to fit in memory, but there are not
many instances of this left (sorting comes to mind, and this is being
worked on).

In general, one problem with Spark today is that you *can* OOM under
certain configurations, and it's possible you'll need to change from the
default configuration if you're using doing very memory-intensive jobs.
However, there are very few cases where Spark would simply fail as a matter
of course *-- *for instance, you can always increase the number of
partitions to decrease the size of any given one. or repartition data to
eliminate skew.

Regarding impact on performance, as Mayur said, there may absolutely be an
impact depending on your jobs. If you're doing a join on a very large
amount of data with few partitions, then we'll have to spill to disk. If
you can't cache your working set of data in memory, you will also see a
performance degradation. Spark enables the use of memory to make things
fast, but if you just don't have enough memory, it won't be terribly fast.


On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 Clearly thr will be impact on performance but frankly depends on what you
 are trying to achieve with the dataset.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Some inputs will be really helpful.

 Thanks,
 -Vibhor


 On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Hi all,

 I am planning to use spark with HBase, where I generate RDD by reading
 data from HBase Table.

 I want to know that in the case when the size of HBase Table grows
 larger than the size of RAM available in the cluster, will the application
 fail, or will there be an impact in performance ?

 Any thoughts in this direction will be helpful and are welcome.

 Thanks,
 -Vibhor




 --
 Vibhor Banga
 Software Development Engineer
 Flipkart Internet Pvt. Ltd., Bangalore





Re: Akka disassociation on Java SE Embedded

2014-06-01 Thread Aaron Davidson
Thanks for the update! I've also run into the block manager timeout issue,
it might be a good idea to increase the default significantly (it would
probably timeout immediately if the TCP connection itself dropped anyway).


On Sun, Jun 1, 2014 at 9:48 AM, Chanwit Kaewkasi chan...@gmail.com wrote:

 Hi all,

 This is what I found:

 1. Like Aaron suggested, an executor will be killed silently when the
 OS's memory is running out.
 I've found this many times to conclude this it's real. Adding swap and
 increasing the JVM heap solved the problem, but you will encounter OS
 paging out and full GC.

 2. OS paging out and full GC are not likely to affect my benchmark
 much while processing data from HDFS. But Akka process's randomly
 killed during the network-related stage (for example, sorting). I've
 found that an Akka process cannot fetch the result fast enough.
 Increasing the block manager timeout helped a lot. I've doubled the
 value many times as the network of our ARM cluster is quite slow.

 3. We'd like to collect times spent for all stages of our benchmark.
 So we always re-run when some tasks failed. Failure happened a lot but
 it's understandable as Spark is designed on top of Akka's let-it-crash
 philosophy. To make the benchmark run more perfectly (without a task
 failure), I called .cache() before calling the transformation of the
 next stage. And it helped a lot.

 Combined above and others tuning, we can now boost the performance of
 our ARM cluster to 2.8 times faster than our first report.

 Best regards,

 -chanwit

 --
 Chanwit Kaewkasi
 linkedin.com/in/chanwit


 On Wed, May 28, 2014 at 1:13 AM, Chanwit Kaewkasi chan...@gmail.com
 wrote:
  May be that's explaining mine too.
  Thank you very much, Aaron !!
 
  Best regards,
 
  -chanwit
 
  --
  Chanwit Kaewkasi
  linkedin.com/in/chanwit
 
 
  On Wed, May 28, 2014 at 12:47 AM, Aaron Davidson ilike...@gmail.com
 wrote:
  Spark should effectively turn Akka's failure detector off, because we
  historically had problems with GCs and other issues causing
 disassociations.
  The only thing that should cause these messages nowadays is if the TCP
  connection (which Akka sustains between Actor Systems on different
 machines)
  actually drops. TCP connections are pretty resilient, so one common
 cause of
  this is actual Executor failure -- recently, I have experienced a
  similar-sounding problem due to my machine's OOM killer terminating my
  Executors, such that they didn't produce any error output.
 
 
  On Thu, May 22, 2014 at 9:19 AM, Chanwit Kaewkasi chan...@gmail.com
 wrote:
 
  Hi all,
 
  On an ARM cluster, I have been testing a wordcount program with JRE 7
  and everything is OK. But when changing to the embedded version of
  Java SE (Oracle's eJRE), the same program cannot complete all
  computing stages.
 
  It is failed by many Akka's disassociation.
 
  - I've been trying to increase Akka's timeout but still stuck. I am
  not sure what is the right way to do so? (I suspected that GC pausing
  the world is causing this).
 
  - Another question is that how could I properly turn on Akka's logging
  to see what's the root cause of this disassociation problem? (If my
  guess about GC is wrong).
 
  Best regards,
 
  -chanwit
 
  --
  Chanwit Kaewkasi
  linkedin.com/in/chanwit
 
 



Re: hadoopRDD stalls reading entire directory

2014-06-01 Thread Aaron Davidson
Gotcha. The easiest way to get your dependencies to your Executors would
probably be to construct your SparkContext with all necessary jars passed
in (as the jars parameter), or inside a SparkConf with setJars(). Avro is
a necessary jar, but it's possible your application also needs to
distribute other ones to the cluster.

An easy way to make sure all your dependencies get shipped to the cluster
is to create an assembly jar of your application, and then you just need to
tell Spark about that jar, which includes all your application's transitive
dependencies. Maven and sbt both have pretty straightforward ways of
producing assembly jars.


On Sat, May 31, 2014 at 11:23 PM, Russell Jurney russell.jur...@gmail.com
wrote:

 Thanks for the fast reply.

 I am running CDH 4.4 with the Cloudera Parcel of Spark 0.9.0, in
 standalone mode.


 On Saturday, May 31, 2014, Aaron Davidson ilike...@gmail.com wrote:

 First issue was because your cluster was configured incorrectly. You
 could probably read 1 file because that was done on the driver node, but
 when it tried to run a job on the cluster, it failed.

 Second issue, it seems that the jar containing avro is not getting
 propagated to the Executors. What version of Spark are you running on? What
 deployment mode (YARN, standalone, Mesos)?


 On Sat, May 31, 2014 at 9:37 PM, Russell Jurney russell.jur...@gmail.com
  wrote:

 Now I get this:

 scala rdd.first

 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at
 console:41

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 4 (first at
 console:41) with 1 output partitions (allowLocal=true)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 4
 (first at console:41)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final stage:
 List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Computing the requested
 partition locally

 14/05/31 21:36:28 INFO rdd.HadoopRDD: Input split:
 hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-0.avro:0+3864

 14/05/31 21:36:28 INFO spark.SparkContext: Job finished: first at
 console:41, took 0.037371256 s

 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at
 console:41

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 5 (first at
 console:41) with 16 output partitions (allowLocal=true)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 5
 (first at console:41)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final stage:
 List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting Stage 5
 (HadoopRDD[0] at hadoopRDD at console:37), which has no missing parents

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting 16 missing
 tasks from Stage 5 (HadoopRDD[0] at hadoopRDD at console:37)

 14/05/31 21:36:28 INFO scheduler.TaskSchedulerImpl: Adding task set 5.0
 with 16 tasks

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:0 as
 TID 92 on executor 2: hivecluster3 (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:0 as
 1294 bytes in 1 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:3 as
 TID 93 on executor 1: hivecluster5.labs.lan (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:3 as
 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:1 as
 TID 94 on executor 4: hivecluster4 (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:1 as
 1294 bytes in 1 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:2 as
 TID 95 on executor 0: hivecluster6.labs.lan (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:2 as
 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:4 as
 TID 96 on executor 3: hivecluster1.labs.lan (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:4 as
 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:6 as
 TID 97 on executor 2: hivecluster3 (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:6 as
 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:5 as
 TID 98 on executor 1: hivecluster5.labs.lan (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:5 as
 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:8 as
 TID 99 on executor 4: hivecluster4 (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:8 as
 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:7 as
 TID 100 on executor 0: hivecluster6.labs.lan (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager

Re: hadoopRDD stalls reading entire directory

2014-06-01 Thread Aaron Davidson
You can avoid that by using the constructor that takes a SparkConf, a la

val conf = new SparkConf()
conf.setJars(avro.jar, ...)
val sc = new SparkContext(conf)


On Sun, Jun 1, 2014 at 2:32 PM, Russell Jurney russell.jur...@gmail.com
wrote:

 Followup question: the docs to make a new SparkContext require that I know
 where $SPARK_HOME is. However, I have no idea. Any idea where that might be?


 On Sun, Jun 1, 2014 at 10:28 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 Gotcha. The easiest way to get your dependencies to your Executors would
 probably be to construct your SparkContext with all necessary jars passed
 in (as the jars parameter), or inside a SparkConf with setJars(). Avro is
 a necessary jar, but it's possible your application also needs to
 distribute other ones to the cluster.

 An easy way to make sure all your dependencies get shipped to the cluster
 is to create an assembly jar of your application, and then you just need to
 tell Spark about that jar, which includes all your application's transitive
 dependencies. Maven and sbt both have pretty straightforward ways of
 producing assembly jars.


 On Sat, May 31, 2014 at 11:23 PM, Russell Jurney 
 russell.jur...@gmail.com wrote:

 Thanks for the fast reply.

 I am running CDH 4.4 with the Cloudera Parcel of Spark 0.9.0, in
 standalone mode.


 On Saturday, May 31, 2014, Aaron Davidson ilike...@gmail.com wrote:

 First issue was because your cluster was configured incorrectly. You
 could probably read 1 file because that was done on the driver node, but
 when it tried to run a job on the cluster, it failed.

 Second issue, it seems that the jar containing avro is not getting
 propagated to the Executors. What version of Spark are you running on? What
 deployment mode (YARN, standalone, Mesos)?


 On Sat, May 31, 2014 at 9:37 PM, Russell Jurney 
 russell.jur...@gmail.com wrote:

 Now I get this:

 scala rdd.first

 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at
 console:41

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 4 (first at
 console:41) with 1 output partitions (allowLocal=true)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 4
 (first at console:41)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final stage:
 List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Computing the requested
 partition locally

 14/05/31 21:36:28 INFO rdd.HadoopRDD: Input split:
 hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-0.avro:0+3864

 14/05/31 21:36:28 INFO spark.SparkContext: Job finished: first at
 console:41, took 0.037371256 s

 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at
 console:41

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 5 (first at
 console:41) with 16 output partitions (allowLocal=true)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 5
 (first at console:41)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final stage:
 List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting Stage 5
 (HadoopRDD[0] at hadoopRDD at console:37), which has no missing parents

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting 16 missing
 tasks from Stage 5 (HadoopRDD[0] at hadoopRDD at console:37)

 14/05/31 21:36:28 INFO scheduler.TaskSchedulerImpl: Adding task set 5.0
 with 16 tasks

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:0 as
 TID 92 on executor 2: hivecluster3 (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:0
 as 1294 bytes in 1 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:3 as
 TID 93 on executor 1: hivecluster5.labs.lan (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:3
 as 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:1 as
 TID 94 on executor 4: hivecluster4 (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:1
 as 1294 bytes in 1 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:2 as
 TID 95 on executor 0: hivecluster6.labs.lan (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:2
 as 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:4 as
 TID 96 on executor 3: hivecluster1.labs.lan (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:4
 as 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:6 as
 TID 97 on executor 2: hivecluster3 (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:6
 as 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:5 as
 TID 98 on executor 1: hivecluster5.labs.lan (NODE_LOCAL)

 14/05/31 21

Re: hadoopRDD stalls reading entire directory

2014-06-01 Thread Aaron Davidson
Sounds like you have two shells running, and the first one is talking all
your resources. Do a jps and kill the other guy, then try again.

By the way, you can look at http://localhost:8080 (replace localhost with
the server your Spark Master is running on) to see what applications are
currently started, and what resource allocations they have.


On Sun, Jun 1, 2014 at 6:47 PM, Russell Jurney russell.jur...@gmail.com
wrote:

 Thanks again. Run results here:
 https://gist.github.com/rjurney/dc0efae486ba7d55b7d5

 This time I get a port already in use exception on 4040, but it isn't
 fatal. Then when I run rdd.first, I get this over and over:

 14/06/01 18:35:40 WARN scheduler.TaskSchedulerImpl: Initial job has not 
 accepted any resources; check your cluster UI to ensure that workers are 
 registered and have sufficient memory





 On Sun, Jun 1, 2014 at 3:09 PM, Aaron Davidson ilike...@gmail.com wrote:

 You can avoid that by using the constructor that takes a SparkConf, a la

 val conf = new SparkConf()
 conf.setJars(avro.jar, ...)
 val sc = new SparkContext(conf)


 On Sun, Jun 1, 2014 at 2:32 PM, Russell Jurney russell.jur...@gmail.com
 wrote:

 Followup question: the docs to make a new SparkContext require that I
 know where $SPARK_HOME is. However, I have no idea. Any idea where that
 might be?


 On Sun, Jun 1, 2014 at 10:28 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 Gotcha. The easiest way to get your dependencies to your Executors
 would probably be to construct your SparkContext with all necessary jars
 passed in (as the jars parameter), or inside a SparkConf with setJars().
 Avro is a necessary jar, but it's possible your application also needs to
 distribute other ones to the cluster.

 An easy way to make sure all your dependencies get shipped to the
 cluster is to create an assembly jar of your application, and then you just
 need to tell Spark about that jar, which includes all your application's
 transitive dependencies. Maven and sbt both have pretty straightforward
 ways of producing assembly jars.


 On Sat, May 31, 2014 at 11:23 PM, Russell Jurney 
 russell.jur...@gmail.com wrote:

 Thanks for the fast reply.

 I am running CDH 4.4 with the Cloudera Parcel of Spark 0.9.0, in
 standalone mode.


 On Saturday, May 31, 2014, Aaron Davidson ilike...@gmail.com wrote:

 First issue was because your cluster was configured incorrectly. You
 could probably read 1 file because that was done on the driver node, but
 when it tried to run a job on the cluster, it failed.

 Second issue, it seems that the jar containing avro is not getting
 propagated to the Executors. What version of Spark are you running on? 
 What
 deployment mode (YARN, standalone, Mesos)?


 On Sat, May 31, 2014 at 9:37 PM, Russell Jurney 
 russell.jur...@gmail.com wrote:

 Now I get this:

 scala rdd.first

 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at
 console:41

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 4 (first at
 console:41) with 1 output partitions (allowLocal=true)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 4
 (first at console:41)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final
 stage: List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Computing the
 requested partition locally

 14/05/31 21:36:28 INFO rdd.HadoopRDD: Input split:
 hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-0.avro:0+3864

 14/05/31 21:36:28 INFO spark.SparkContext: Job finished: first at
 console:41, took 0.037371256 s

 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at
 console:41

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 5 (first at
 console:41) with 16 output partitions (allowLocal=true)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 5
 (first at console:41)

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final
 stage: List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List()

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting Stage 5
 (HadoopRDD[0] at hadoopRDD at console:37), which has no missing parents

 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting 16 missing
 tasks from Stage 5 (HadoopRDD[0] at hadoopRDD at console:37)

 14/05/31 21:36:28 INFO scheduler.TaskSchedulerImpl: Adding task set
 5.0 with 16 tasks

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:0
 as TID 92 on executor 2: hivecluster3 (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task
 5.0:0 as 1294 bytes in 1 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:3
 as TID 93 on executor 1: hivecluster5.labs.lan (NODE_LOCAL)

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task
 5.0:3 as 1294 bytes in 0 ms

 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:1
 as TID 94 on executor 4: hivecluster4

Re: Can anyone help me set memory for standalone cluster?

2014-06-01 Thread Aaron Davidson
In addition to setting the Standalone memory, you'll also need to tell your
SparkContext to claim the extra resources. Set spark.executor.memory to
1600m as well. This should be a system property set in SPARK_JAVA_OPTS in
conf/spark-env.sh (in 0.9.1, which you appear to be using) -- e.g.,
export SPARK_JAVA_OPTS=-Dspark.executor.memory=1600mb


On Sun, Jun 1, 2014 at 7:36 PM, Yunmeng Ban banyunm...@gmail.com wrote:

 Hi,

 I'm running the example of JavaKafkaWordCount in a standalone cluster. I
 want to set 1600MB memory for each slave node. I wrote in the
 spark/conf/spark-env.sh

 SPARK_WORKER_MEMORY=1600m

 But the logs on slave nodes looks this:
 Spark Executor Command: /usr/java/latest/bin/java -cp
 :/~path/spark/conf:/~path/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
 -Xms512M -Xmx512M
 org.apache.spark.executor.CoarseGrainedExecutorBackend

 The memory seems to be the default number, not 1600M.
 I don't how to make SPARK_WORKER_MEMORY work.
 Can anyone help me?
 Many thanks in advance.

 Yunmeng



Re: Spark 1.0.0 - Java 8

2014-05-30 Thread Aaron Davidson
Also, the Spark examples can run out of the box on a single machine, as
well as a cluster. See the Master URLs heading here:
http://spark.apache.org/docs/latest/submitting-applications.html#master-urls


On Fri, May 30, 2014 at 9:24 AM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 With respect to virtual hosts, my team uses Vagrant/Virtualbox. We have 3
 CentOS VMs with 4 GB RAM each - 2 worker nodes and a master node.

 Everything works fine, though if you are using MapR, you have to make sure
 they are all on the same subnet.

 -Suren



 On Fri, May 30, 2014 at 12:20 PM, Upender Nimbekar upent...@gmail.com
 wrote:

 Great News ! I've been awaiting this release to start doing some coding
 with Spark using Java 8. Can I run Spark 1.0 examples on a virtual host
 with 16 GB ram and fair descent amount of hard disk ? Or do I reaaly need
 to use a cluster of machines.
 Second, are there any good exmaples of using MLIB on Spark. Please shoot
 me in the right direction.

 Thanks
 Upender

 On Fri, May 30, 2014 at 6:12 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 I'm thrilled to announce the availability of Spark 1.0.0! Spark 1.0.0
 is a milestone release as the first in the 1.0 line of releases,
 providing API stability for Spark's core interfaces.

 Spark 1.0.0 is Spark's largest release ever, with contributions from
 117 developers. I'd like to thank everyone involved in this release -
 it was truly a community effort with fixes, features, and
 optimizations contributed from dozens of organizations.

 This release expands Spark's standard libraries, introducing a new SQL
 package (SparkSQL) which lets users integrate SQL queries into
 existing Spark workflows. MLlib, Spark's machine learning library, is
 expanded with sparse vector support and several new algorithms. The
 GraphX and Streaming libraries also introduce new features and
 optimizations. Spark's core engine adds support for secured YARN
 clusters, a unified tool for submitting Spark applications, and
 several performance and stability improvements. Finally, Spark adds
 support for Java 8 lambda syntax and improves coverage of the Java and
 Python API's.

 Those features only scratch the surface - check out the release notes
 here:
 http://spark.apache.org/releases/spark-release-1-0-0.html

 Note that since release artifacts were posted recently, certain
 mirrors may not have working downloads for a few hours.

 - Patrick





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




Re: access hdfs file name in map()

2014-05-29 Thread Aaron Davidson
Currently there is not a way to do this using textFile(). However, you
could pretty straightforwardly define your own subclass of HadoopRDD [1] in
order to get access to this information (likely using
mapPartitionsWithIndex to look up the InputSplit for a particular
partition).

Note that sc.textFile() is just a convenience function to construct a new
HadoopRDD [2].

[1] HadoopRDD:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L93
[2] sc.textFile():
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L456


On Thu, May 29, 2014 at 7:49 PM, Xu (Simon) Chen xche...@gmail.com wrote:

 Hello,

 A quick question about using spark to parse text-format CSV files stored
 on hdfs.

 I have something very simple:
 sc.textFile(hdfs://test/path/*).map(line = line.split(,)).map(p =
 (XXX, p[0], p[2]))

 Here, I want to replace XXX with a string, which is the current csv
 filename for the line. This is needed since some information may be encoded
 in the file name, like date.

 In hive, I am able to define an external table and use INPUT__FILE__NAME
 as a column in queries. I wonder if spark has something similar.

 Thanks!
 -Simon



  1   2   >