Re: Autoscaling of Spark YARN cluster

2015-12-14 Thread Mingyu Kim
Cool. Using Ambari to monitor and scale up/down the cluster sounds
promising. Thanks for the pointer!

Mingyu

From:  Deepak Sharma 
Date:  Monday, December 14, 2015 at 1:53 AM
To:  cs user 
Cc:  Mingyu Kim , "user@spark.apache.org"

Subject:  Re: Autoscaling of Spark YARN cluster

An approach I can think of  is using Ambari Metrics Service(AMS)
Using these metrics , you can decide upon if the cluster is low in
resources.
If yes, call the Ambari management API to add the node to the cluster.

Thanks
Deepak

On Mon, Dec 14, 2015 at 2:48 PM, cs user  wrote:
> Hi Mingyu, 
> 
> I'd be interested in hearing about anything else you find which might meet
> your needs for this.
> 
> One way perhaps this could be done would be to use Ambari. Ambari comes with a
> nice api which you can use to add additional nodes into a cluster:
> 
> https://github.com/apache/ambari/blob/trunk/ambari-server/docs/api/v1/index.md
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_ambari
> _blob_trunk_ambari-2Dserver_docs_api_v1_index.md&d=CwMFaQ&c=izlc9mHr637UR4lpLE
> ZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=tDt9
> pyS5Gz-4R50zQ9pG1lDSVv8Gg03JsQXzTtsghag&s=aceNpj9HLTmsTeVMI5VMxj9HmbU3ls0gqxa2
> OVkkUOA&e=> 
> 
> Once the node has been built, the ambari agent installed, you can then call
> back to the management node via the api, tell it what you want the new node to
> be, and it will connect, configure your new node and add it to the cluster.
> 
> You could create a host group within the cluster blueprint with the minimal
> components you need to install to have it operate as a yarn node.
> 
> As for the decision to scale, that is outside of the remit of Ambari. I guess
> you could look into using aws autoscaling or you could look into a product
> called scalr, which has an opensource version. We are using this to install an
> ambari cluster using chef to configure the nodes up until the point it hands
> over to Ambari. 
> 
> Scalr allows you to write custom scaling metrics which you could use to query
> the # of applications queued, # of resources available values and add nodes
> when required. 
> 
> Cheers!
> 
> On Mon, Dec 14, 2015 at 8:57 AM, Mingyu Kim  wrote:
>> Hi all,
>> 
>> Has anyone tried out autoscaling Spark YARN cluster on a public cloud (e.g.
>> EC2) based on workload? To be clear, I¹m interested in scaling the cluster
>> itself up and down by adding and removing YARN nodes based on the cluster
>> resource utilization (e.g. # of applications queued, # of resources
>> available), as opposed to scaling resources assigned to Spark applications,
>> which is natively supported by Spark¹s dynamic resource scheduling. I¹ve
>> found that Cloudbreak
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__sequenceiq.com_cloudbrea
>> k-2Ddocs_latest_periscope_-23how-2Dit-2Dworks&d=CwMFaQ&c=izlc9mHr637UR4lpLEZL
>> FFS3Vn2UXBrZ4tFb6oOnmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=tDt9p
>> yS5Gz-4R50zQ9pG1lDSVv8Gg03JsQXzTtsghag&s=qKfLbs_mv_rLKTEHN1FUW98fehzu7HAbdD7t
>> h9dykTg&e=>  has a similar feature, but it¹s in ³technical preview², and I
>> didn¹t find much else from my search.
>> 
>> This might be a general YARN question, but wanted to check if there¹s a
>> solution popular in the Spark community. Any sharing of experience around
>> autoscaling will be helpful!
>> 
>> Thanks,
>> Mingyu
> 



-- 
Thanks
Deepak
www.bigdatabig.com 
<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.bigdatabig.com&d=Cw
MFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=ennQJq47pNnObsDh-88a9YU
rUulcYQoV8giPASqXB84&m=tDt9pyS5Gz-4R50zQ9pG1lDSVv8Gg03JsQXzTtsghag&s=HGOZP3P
urGS6jiGFWaz2IevpABa9qmCrmkbP-hwvmhI&e=>
www.keosha.net 
<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.keosha.net&d=CwMFaQ
&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=ennQJq47pNnObsDh-88a9YUrUul
cYQoV8giPASqXB84&m=tDt9pyS5Gz-4R50zQ9pG1lDSVv8Gg03JsQXzTtsghag&s=U8sfm5YwpBP
1s8c4QjkSsmIESUG56RNKo3O6ZEnijA4&e=>




smime.p7s
Description: S/MIME cryptographic signature


Autoscaling of Spark YARN cluster

2015-12-14 Thread Mingyu Kim
Hi all,

Has anyone tried out autoscaling Spark YARN cluster on a public cloud (e.g.
EC2) based on workload? To be clear, I¹m interested in scaling the cluster
itself up and down by adding and removing YARN nodes based on the cluster
resource utilization (e.g. # of applications queued, # of resources
available), as opposed to scaling resources assigned to Spark applications,
which is natively supported by Spark¹s dynamic resource scheduling. I¹ve
found that Cloudbreak
  has
a similar feature, but it¹s in ³technical preview², and I didn¹t find much
else from my search.

This might be a general YARN question, but wanted to check if there¹s a
solution popular in the Spark community. Any sharing of experience around
autoscaling will be helpful!

Thanks,
Mingyu




smime.p7s
Description: S/MIME cryptographic signature


Re: compatibility issue with Jersey2

2015-10-13 Thread Mingyu Kim
Hi all,

I filed https://issues.apache.org/jira/browse/SPARK-11081. Since Jersey’s 
surface area is relatively small and seems to be only used for Spark UI and 
json API, shading the dependency might make sense similar to what’s done for 
Jerry dependencies at https://issues.apache.org/jira/browse/SPARK-3996. Would 
this be reasonable?

Mingyu







On 10/7/15, 11:26 AM, "Marcelo Vanzin"  wrote:

>Seems like you might be running into
>https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D10910&d=CQIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=GuNlWwLNE7UP5euS6Ccu86dUSs1AuiouVOM3bTeZuoQ&s=Z23j4oFFQ12DNJYiFfXFsXPlpav2HD0W0eZqVEhjjOk&e=
> . I've been busy with
>other things but plan to take a look at that one when I find time...
>right now I don't really have a solution, other than making sure your
>application's jars do not include those classes the exception is
>complaining about.
>
>On Wed, Oct 7, 2015 at 10:23 AM, Gary Ogden  wrote:
>> What you suggested seems to have worked for unit tests. But now it throws
>> this at run time on mesos with spark-submit:
>>
>> Exception in thread "main" java.lang.LinkageError: loader constraint
>> violation: when resolving method
>> "org.slf4j.impl.StaticLoggerBinder.getLoggerFactory()Lorg/slf4j/ILoggerFactory;"
>> the class loader (instance of
>> org/apache/spark/util/ChildFirstURLClassLoader) of the current class,
>> org/slf4j/LoggerFactory, and the class loader (instance of
>> sun/misc/Launcher$AppClassLoader) for resolved class,
>> org/slf4j/impl/StaticLoggerBinder, have different Class objects for the type
>> LoggerFactory; used in the signature
>>  at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:336)
>>  at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:284)
>>  at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:305)
>>  at com.company.spark.utils.SparkJob.(SparkJob.java:41)
>>  at java.lang.Class.forName0(Native Method)
>>  at java.lang.Class.forName(Unknown Source)
>>  at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:634)
>>  at 
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>> On 6 October 2015 at 16:20, Marcelo Vanzin  wrote:
>>>
>>> On Tue, Oct 6, 2015 at 12:04 PM, Gary Ogden  wrote:
>>> > But we run unit tests differently in our build environment, which is
>>> > throwing the error. It's setup like this:
>>> >
>>> > I suspect this is what you were referring to when you said I have a
>>> > problem?
>>>
>>> Yes, that is what I was referring to. But, in your test environment,
>>> you might be able to work around the problem by setting
>>> "spark.ui.enabled=false"; that should disable all the code that uses
>>> Jersey, so you can use your newer version in your unit tests.
>>>
>>>
>>> --
>>> Marcelo
>>
>>
>
>
>
>-- 
>Marcelo
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

smime.p7s
Description: S/MIME cryptographic signature


Re: Which OutputCommitter to use for S3?

2015-02-23 Thread Mingyu Kim
Cool, we will start from there. Thanks Aaron and Josh!

Darin, it¹s likely because the DirectOutputCommitter is compiled with
Hadoop 1 classes and you¹re running it with Hadoop 2.
org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
became an interface in Hadoop 2.

Mingyu





On 2/23/15, 11:52 AM, "Darin McBeath"  wrote:

>Aaron.  Thanks for the class. Since I'm currently writing Java based
>Spark applications, I tried converting your class to Java (it seemed
>pretty straightforward).
>
>I set up the use of the class as follows:
>
>SparkConf conf = new SparkConf()
>.set("spark.hadoop.mapred.output.committer.class",
>"com.elsevier.common.DirectOutputCommitter");
>
>And I then try and save a file to S3 (which I believe should use the old
>hadoop apis).
>
>JavaPairRDD newBaselineRDDWritable =
>reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
>newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
>Text.class, Text.class, SequenceFileOutputFormat.class,
>org.apache.hadoop.io.compress.GzipCodec.class);
>
>But, I get the following error message.
>
>Exception in thread "main" java.lang.IncompatibleClassChangeError: Found
>class org.apache.hadoop.mapred.JobContext, but interface was expected
>at 
>com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
>java:68)
>at 
>org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
>.scala:1075)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>ala:940)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>ala:902)
>at 
>org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
>71)
>at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
>
>In my class, JobContext is an interface of  type
>org.apache.hadoop.mapred.JobContext.
>
>Is there something obvious that I might be doing wrong (or messed up in
>the translation from Scala to Java) or something I should look into?  I'm
>using Spark 1.2 with hadoop 2.4.
>
>
>Thanks.
>
>Darin.
>
>
>
>
>
>From: Aaron Davidson 
>To: Andrew Ash 
>Cc: Josh Rosen ; Mingyu Kim ;
>"user@spark.apache.org" ; Aaron Davidson
>
>Sent: Saturday, February 21, 2015 7:01 PM
>Subject: Re: Which OutputCommitter to use for S3?
>
>
>
>Here is the class:
>https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
>dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
>Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs
>zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8&e=
>
>You can use it by setting "mapred.output.committer.class" in the Hadoop
>configuration (or "spark.hadoop.mapred.output.committer.class" in the
>Spark configuration). Note that this only works for the old Hadoop APIs,
>I believe the new Hadoop APIs strongly tie committer to input format (so
>FileInputFormat always uses FileOutputCommitter), which makes this fix
>more difficult to apply.
>
>
>
>
>On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash  wrote:
>
>Josh is that class something you guys would consider open sourcing, or
>would you rather the community step up and create an OutputCommitter
>implementation optimized for S3?
>>
>>
>>On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen  wrote:
>>
>>We (Databricks) use our own DirectOutputCommitter implementation, which
>>is a couple tens of lines of Scala code.  The class would almost
>>entirely be a no-op except we took some care to properly handle the
>>_SUCCESS file.
>>>
>>>
>>>On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim  wrote:
>>>
>>>I didn¹t get any response. It¹d be really appreciated if anyone using a
>>>special OutputCommitter for S3 can comment on this!
>>>>
>>>>
>>>>Thanks,
>>>>Mingyu
>>>>
>>>>
>>>>From: Mingyu Kim 
>>>>Date: Monday, February 16, 2015 at 1:15 AM
>>>>To: "user@spark.apache.org" 
>>>>Subject: Which OutputCommitter to use for S3?
>>>>
>>>>
>>>>
>>>>HI all,
>>>>
>>>>
>>>>The default OutputCommitter used by RDD, which is FileOutputCommitter,
>>>>seems to require moving files at the commit step, which is not a
>>>>constant operation

Re: Which OutputCommitter to use for S3?

2015-02-20 Thread Mingyu Kim
I didn’t get any response. It’d be really appreciated if anyone using a special 
OutputCommitter for S3 can comment on this!

Thanks,
Mingyu

From: Mingyu Kim mailto:m...@palantir.com>>
Date: Monday, February 16, 2015 at 1:15 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Which OutputCommitter to use for S3?

HI all,

The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to 
require moving files at the commit step, which is not a constant operation in 
S3, as discussed in 
http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E<https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253E&d=AwMFAg&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BY&s=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyc&e=>.
 People seem to develop their own NullOutputCommitter implementation or use 
DirectFileOutputCommitter (as mentioned in 
SPARK-3595<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595&d=AwMFAg&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BY&s=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_g&e=>),
 but I wanted to check if there is a de facto standard, publicly available 
OutputCommitter to use for S3 in conjunction with Spark.

Thanks,
Mingyu


Which OutputCommitter to use for S3?

2015-02-16 Thread Mingyu Kim
HI all,

The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to 
require moving files at the commit step, which is not a constant operation in 
S3, as discussed in 
http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E.
 People seem to develop their own NullOutputCommitter implementation or use 
DirectFileOutputCommitter (as mentioned in 
SPARK-3595), but I wanted to 
check if there is a de facto standard, publicly available OutputCommitter to 
use for S3 in conjunction with Spark.

Thanks,
Mingyu


Re: Larger heap leads to perf degradation due to GC

2014-10-06 Thread Mingyu Kim
Ok, cool. This seems to be general issues in JVM with very large heaps. I
agree that the best workaround would be to keep the heap size below 32GB.
Thanks guys!

Mingyu

From:  Arun Ahuja 
Date:  Monday, October 6, 2014 at 7:50 AM
To:  Andrew Ash 
Cc:  Mingyu Kim , "user@spark.apache.org"
, Dennis Lawler 
Subject:  Re: Larger heap leads to perf degradation due to GC

We have used the strategy that you suggested, Andrew - using many workers
per machine and keeping the heaps small (< 20gb).

Using a large heap resulted in workers hanging or not responding (leading to
timeouts).  The same dataset/job for us will fail (most often due to akka
disassociated or fetch failures errors) with 10 cores / 100 executors, 60 gb
per executor while succceed with 1 core / 1000 executors / 6gb per executor.

When the job does succceed with more cores per executor and larger heap it
is usually much slower than the smaller executors (the same 8-10 min job
taking 15-20 min to complete)

The unfortunate downside of this has been, we have had some large broadcast
variables which may not fit into memory (and unnecessarily duplicated) when
using the smaller executors.

Most of this is anecdotal but for the most part we have had more success and
consistency with more executors with smaller memory requirements.

On Sun, Oct 5, 2014 at 7:20 PM, Andrew Ash  wrote:
> Hi Mingyu, 
> 
> Maybe we should be limiting our heaps to 32GB max and running multiple workers
> per machine to avoid large GC issues.
> 
> For a 128GB memory, 32 core machine, this could look like:
> 
> SPARK_WORKER_INSTANCES=4
> SPARK_WORKER_MEMORY=32
> SPARK_WORKER_CORES=8
> 
> Are people running with large (32GB+) executor heaps in production?  I'd be
> curious to hear if so.
> 
> Cheers!
> Andrew
> 
> On Thu, Oct 2, 2014 at 1:30 PM, Mingyu Kim  wrote:
>> This issue definitely needs more investigation, but I just wanted to quickly
>> check if anyone has run into this problem or has general guidance around it.
>> We¹ve seen a performance degradation with a large heap on a simple map task
>> (I.e. No shuffle). We¹ve seen the slowness starting around from 50GB heap.
>> (I.e. spark.executor.memoty=50g) And, when we checked the CPU usage, there
>> were just a lot of GCs going on.
>> 
>> Has anyone seen a similar problem?
>> 
>> Thanks,
>> Mingyu
> 





smime.p7s
Description: S/MIME cryptographic signature


Larger heap leads to perf degradation due to GC

2014-10-02 Thread Mingyu Kim
This issue definitely needs more investigation, but I just wanted to quickly
check if anyone has run into this problem or has general guidance around it.
We¹ve seen a performance degradation with a large heap on a simple map task
(I.e. No shuffle). We¹ve seen the slowness starting around from 50GB heap.
(I.e. spark.executor.memoty=50g) And, when we checked the CPU usage, there
were just a lot of GCs going on.

Has anyone seen a similar problem?

Thanks,
Mingyu




smime.p7s
Description: S/MIME cryptographic signature


Re: How does Spark speculation prevent duplicated work?

2014-07-16 Thread Mingyu Kim
That makes sense. Thanks everyone for the explanations!

Mingyu

From:  Matei Zaharia 
Reply-To:  "user@spark.apache.org" 
Date:  Tuesday, July 15, 2014 at 3:00 PM
To:  "user@spark.apache.org" 
Subject:  Re: How does Spark speculation prevent duplicated work?

Yeah, this is handled by the "commit" call of the FileOutputFormat. In
general Hadoop OutputFormats have a concept called "committing" the output,
which you should do only once per partition. In the file ones it does an
atomic rename to make sure that the final output is a complete file.

Matei

On Jul 15, 2014, at 2:49 PM, Tathagata Das 
wrote:

> The way the HDFS file writing works at a high level is that each attempt to
> write a partition to a file starts writing to unique temporary file (say,
> something like targetDirectory/_temp/part-X_attempt-). If the writing
> into the file successfully completes, then the temporary file is moved to the
> final location (say, targetDirectory/part-X). If, due to speculative
> execution, the file already exists in the final intended location, then move
> is avoided. Or, its overwritten, I forget the implementation. Either ways, all
> attempts to write the same partition, will always write the same data to the
> temp file (assuming the spark transformation generating the data is
> deterministic and idempotent). And once one attempt is successful, the final
> file will have the same data. Hence, writing to HDFS / S3 is idempotent.
> 
> Now this logic is already implemented within the Hadoop's MapReduce logic, and
> Spark just uses it directly.
> 
> TD
> 
> 
> On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim  wrote:
>> Thanks for the explanation, guys.
>> 
>> I looked into the saveAsHadoopFile implementation a little bit. If you see
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/sp
>> ark/rdd/PairRDDFunctions.scala
>> <https://urldefense.proofpoint.com/v1/url?u=https://github.com/apache/spark/b
>> lob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala&k=
>> fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=UKDOcu6qL3KsoZhpOohNBR1ucPNmWnbd3eEJ9hVUdMk
>> %3D%0A&m=Sb74h34ZToCtFlhH6q91HplG%2FXaCtRoAmwWFXD9vXI0%3D%0A&s=a68ed701b6f285
>> 5cc2fb0aaec8d033cd6ef9bafbb2a91ce7a10e465e79d0a4d2>  at line 843, the HDFS
>> write happens at per-partition processing, not at the result handling, so I
>> have a feeling that it might be writing multiple times. This may be fine if
>> both tasks for the same partition completes because it will simply overwrite
>> the output partition with the same content, but this could be an issue if one
>> of the tasks completes and the other is in the middle of writing the
>> partition by the time the entire stage completes. Can someone explain this?
>> 
>> Bertrand, I¹m slightly confused about your comment. So, is it the case that
>> HDFS will handle the writes as a temp file write followed by an atomic move,
>> so the concern I had above is handled at the HDFS level?
>> 
>> Mingyu
>> 
>> From: Bertrand Dechoux 
>> Reply-To: "user@spark.apache.org" 
>> Date: Tuesday, July 15, 2014 at 1:22 PM
>> To: "user@spark.apache.org" 
>> Subject: Re: How does Spark speculation prevent duplicated work?
>> 
>> I haven't look at the implementation but what you would do with any
>> filesystem is write to a file inside the workspace directory of the task. And
>> then only the attempt of the task that should be kept will perform a move to
>> the final path. The other attempts are simply discarded. For most filesystem
>> (and that's the case for HDFS), a 'move' is a very simple and fast action
>> because only the "full path/name" of the file change but not its content or
>> where this content is physically stored.
>> 
>> Executive speculation happens in Hadoop MapReduce. Spark has the same
>> concept. As long as you apply functions with no side effect (ie the only
>> impact is the returned results), then you just need to not take into account
>> results from additional attempts of the same task/operator.
>> 
>> Bertrand Dechoux
>> 
>> 
>> On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash  wrote:
>>> Hi Nan, 
>>> 
>>> Great digging in -- that makes sense to me for when a job is producing some
>>> output handled by Spark like a .count or .distinct or similar.
>>> 
>>> For the other part of the question, I'm also interested in side effects like
>>> an HDFS disk write.  If one task is writing to an HDFS path and another task
>>> starts up, wouldn't it also att

Re: How does Spark speculation prevent duplicated work?

2014-07-15 Thread Mingyu Kim
Thanks for the explanation, guys.

I looked into the saveAsHadoopFile implementation a little bit. If you see
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/s
park/rdd/PairRDDFunctions.scala at line 843, the HDFS write happens at
per-partition processing, not at the result handling, so I have a feeling
that it might be writing multiple times. This may be fine if both tasks for
the same partition completes because it will simply overwrite the output
partition with the same content, but this could be an issue if one of the
tasks completes and the other is in the middle of writing the partition by
the time the entire stage completes. Can someone explain this?

Bertrand, I¹m slightly confused about your comment. So, is it the case that
HDFS will handle the writes as a temp file write followed by an atomic move,
so the concern I had above is handled at the HDFS level?

Mingyu

From:  Bertrand Dechoux 
Reply-To:  "user@spark.apache.org" 
Date:  Tuesday, July 15, 2014 at 1:22 PM
To:  "user@spark.apache.org" 
Subject:  Re: How does Spark speculation prevent duplicated work?

I haven't look at the implementation but what you would do with any
filesystem is write to a file inside the workspace directory of the task.
And then only the attempt of the task that should be kept will perform a
move to the final path. The other attempts are simply discarded. For most
filesystem (and that's the case for HDFS), a 'move' is a very simple and
fast action because only the "full path/name" of the file change but not its
content or where this content is physically stored.

Executive speculation happens in Hadoop MapReduce. Spark has the same
concept. As long as you apply functions with no side effect (ie the only
impact is the returned results), then you just need to not take into account
results from additional attempts of the same task/operator.

Bertrand Dechoux


On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash  wrote:
> Hi Nan, 
> 
> Great digging in -- that makes sense to me for when a job is producing some
> output handled by Spark like a .count or .distinct or similar.
> 
> For the other part of the question, I'm also interested in side effects like
> an HDFS disk write.  If one task is writing to an HDFS path and another task
> starts up, wouldn't it also attempt to write to the same path?  How is that
> de-conflicted?
> 
> 
> On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu  wrote:
>> Hi, Mingyuan,  
>> 
>> According to my understanding,
>> 
>> Spark processes the result generated from each partition by passing them to
>> resultHandler (SparkContext.scala L1056)
>> 
>> This resultHandler is usually just put the result in a driver-side array, the
>> length of which is always partitions.size
>> 
>> this design effectively ensures that the actions are idempotent
>> 
>> e.g. the count is implemented as
>> 
>> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>> 
>> even the task in the partition is duplicately executed, the result put in the
>> array is the same
>> 
>> 
>> 
>> At the same time, I think the Spark implementation ensures that the operation
>> applied on the return value of SparkContext.runJob will not be triggered when
>> the duplicate tasks are finished
>> 
>> Because, 
>> 
>> 
>> when a task is finished, the code execution path is
>> TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded
>> 
>> in taskEnded, it will trigger the CompletionEvent message handler, where
>> DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is
>> the partitionid
>> 
>> so even the duplicate task invokes a CompletionEvent message, it will find
>> job.finished(rt.outputId) has been true eventually
>> 
>> 
>> Maybe I was wrongŠjust went through the code roughly, welcome to correct me
>> 
>> Best,
>> 
>> 
>> -- 
>> Nan Zhu
>> 
>> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:
>>> 
>>> Hi all,
>>> 
>>> I was curious about the details of Spark speculation. So, my understanding
>>> is that, when ³speculated² tasks are newly scheduled on other machines, the
>>> original tasks are still running until the entire stage completes. This
>>> seems to leave some room for duplicated work because some spark actions are
>>> not idempotent. For example, it may be counting a partition twice in case of
>>> RDD.count or may be writing a partition to HDFS twice in case of
>>> RDD.save*(). How does it prevent this kind of duplicated work?
>>> 
>>> Mingyu
>>> 
>>> Attachments:
>>> - smime.p7s
>> 
> 





smime.p7s
Description: S/MIME cryptographic signature


How does Spark speculation prevent duplicated work?

2014-07-15 Thread Mingyu Kim
Hi all,

I was curious about the details of Spark speculation. So, my understanding
is that, when ³speculated² tasks are newly scheduled on other machines, the
original tasks are still running until the entire stage completes. This
seems to leave some room for duplicated work because some spark actions are
not idempotent. For example, it may be counting a partition twice in case of
RDD.count or may be writing a partition to HDFS twice in case of
RDD.save*(). How does it prevent this kind of duplicated work?

Mingyu




smime.p7s
Description: S/MIME cryptographic signature


JavaRDD.mapToPair throws NPE

2014-06-24 Thread Mingyu Kim
Hi all,

I¹m trying to use JavaRDD.mapToPair(), but it fails with NPE on the
executor. The PairFunction used in the call is null for some reason. Any
comments/help would be appreciated!

My setup is,
* Java 7
* Spark 1.0.0
* Hadoop 2.0.0-mr1-cdh4.6.0
Here¹s the code snippet.

> import org.apache.spark.SparkConf;
> 
> import org.apache.spark.api.java.JavaPairRDD;
> 
> import org.apache.spark.api.java.JavaRDD;
> 
> import org.apache.spark.api.java.JavaSparkContext;
> 
> import org.apache.spark.api.java.function.PairFunction;
> 
> 
> 
> import scala.Tuple2;
> 
> 
> 
> public class Test {
> 
> public static void main(String[] args) {
> 
> SparkConf conf = new SparkConf()
> 
> .setMaster("spark://mymaster")
> 
> .setAppName("MyApp")
> 
> .setSparkHome("/my/spark/home");
> 
> 
> 
> JavaSparkContext sc = new JavaSparkContext(conf);
> 
> sc.addJar("/path/to/jar"); // ship the jar of this class
> 
> JavaRDD rdd = sc.textFile("/path/to/nums.csv²); // nums.csv
> simply has one integer per line
> 
> JavaPairRDD pairRdd = rdd.mapToPair(new
> MyPairFunction());
> 
> 
> 
> System.out.println(pairRdd.collect());
> 
> }
> 
> 
> 
> private static final class MyPairFunction implements PairFunction Integer, Integer> {
> 
> private static final long serialVersionUID = 1L;
> 
> 
> 
> @Override
> 
> public Tuple2 call(String s) throws Exception {
> 
> return new Tuple2(Integer.parseInt(s),
> Integer.parseInt(s));
> 
> }
> 
> }
> 
> }
> 
> 
Here¹s the stack trace.
> 
> Exception in thread "main" 14/06/24 14:39:01 INFO scheduler.TaskSchedulerImpl:
> Removed TaskSet 0.0, whose tasks have all completed, from pool
> 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0
> failed 4 times, most recent failure: Exception failure in TID 6 on host
> 10.160.24.216: java.lang.NullPointerException
> 
> 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaP
> airRDD.scala:750)
> 
> 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaP
> airRDD.scala:750)
> 
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> 
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 
> 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> 
> 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> 
> 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> 
> scala.collection.AbstractIterator.to(Iterator.scala:1157)
> 
> 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> 
> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> 
> 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> 
> scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> 
> org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
> 
> org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
> 
> 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
> 
> 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
> 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> 
> org.apache.spark.scheduler.Task.run(Task.scala:51)
> 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> 
> 
> 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145>
)
> 
> 
> 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615>
)
> 
> java.lang.Thread.run(Thread.java:722)
> 
> Driver stacktrace:
> 
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedule
> r$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSchedul
> er.scala:1017)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSchedul
> er.scala:1015)
> 
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(D
> AGScheduler.scala:633)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(D
> AGScheduler.scala:633)
> 
> at scala.Option.foreach(Option.scala:236)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala
> :633)
> 
> at 
>

Re: 1.0.1 release plan

2014-06-20 Thread Mingyu Kim
Cool. Thanks for the note. Looking forward to it.

Mingyu

From:  Andrew Ash 
Reply-To:  "user@spark.apache.org" 
Date:  Friday, June 20, 2014 at 9:54 AM
To:  "user@spark.apache.org" 
Subject:  Re: 1.0.1 release plan

Sounds good.  Mingyu and I are waiting on 1.0.1 to get the fix for the below
issues without running a patched version of Spark:

https://issues.apache.org/jira/browse/SPARK-1935
<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/b
rowse/SPARK-1935&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=UKDOcu6qL3KsoZhpOohNBR1
ucPNmWnbd3eEJ9hVUdMk%3D%0A&m=8IjdFEpkSgz2Ep37vKvlKiSMaK9M0ibHPk6d7HtpKNY%3D%
0A&s=700c9dfbc045bed65992281ad99fbed48a3c687ff51c8e1819045ed991dd1937>  --
commons-codec version conflicts for client applications
https://issues.apache.org/jira/browse/SPARK-2043
<https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/b
rowse/SPARK-2043&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=UKDOcu6qL3KsoZhpOohNBR1
ucPNmWnbd3eEJ9hVUdMk%3D%0A&m=8IjdFEpkSgz2Ep37vKvlKiSMaK9M0ibHPk6d7HtpKNY%3D%
0A&s=b58a003075177647412ab1a1ff3cbac723e48c893e2480790401d4eff8e2d000>  --
correctness issue with spilling


On Fri, Jun 20, 2014 at 1:04 AM, Patrick Wendell  wrote:
> Hey There,
> 
> I'd like to start voting on this release shortly because there are a
> few important fixes that have queued up. We're just waiting to fix an
> akka issue. I'd guess we'll cut a vote in the next few days.
> 
> - Patrick
> 
> On Thu, Jun 19, 2014 at 10:47 AM, Mingyu Kim  wrote:
>> > Hi all,
>> >
>> > Is there any plan for 1.0.1 release?
>> >
>> > Mingyu





smime.p7s
Description: S/MIME cryptographic signature


1.0.1 release plan

2014-06-19 Thread Mingyu Kim
Hi all,

Is there any plan for 1.0.1 release?

Mingyu




smime.p7s
Description: S/MIME cryptographic signature


Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
I agree with you in general that as an API user, I shouldn’t be relying on
code. However, without looking at the code, there is no way for me to find
out even whether map() keeps the row order. Without the knowledge at all,
I’d need to do “sort” every time I need certain things in a certain order.
(and, sort is really expensive.) On the other hand, if I can assume, say,
“filter” or “map” doesn’t shuffle the rows around, I can do the sort once
and assume that the order is retained throughout such operations saving a
lot of time from doing unnecessary sorts.

Mingyu

From:  Mark Hamstra 
Reply-To:  "user@spark.apache.org" 
Date:  Wednesday, April 30, 2014 at 11:36 AM
To:  "user@spark.apache.org" 
Subject:  Re: Union of 2 RDD's only returns the first one

Which is what you shouldn't be doing as an API user, since that
implementation code might change.  The documentation doesn't mention a row
ordering guarantee, so none should be assumed.

It is hard enough for us to correctly document all of the things that the
API does do.  We really shouldn't be forced into the expectation that we
will also fully document everything that the API doesn't do.


On Wed, Apr 30, 2014 at 11:13 AM, Mingyu Kim  wrote:
> Okay, that makes sense. It’d be great if this can be better documented at
> some point, because the only way to find out about the resulting RDD row
> order is by looking at the code.
> 
> Thanks for the discussion!
> 
> Mingyu
> 
> 
> 
> 
> On 4/29/14, 11:59 PM, "Patrick Wendell"  wrote:
> 
>> >I don't think we guarantee anywhere that union(A, B) will behave by
>> >concatenating the partitions, it just happens to be an artifact of the
>> >current implementation.
>> >
>> >rdd1 = [1,2,3]
>> >rdd2 = [1,4,5]
>> >
>> >rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
>> >rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
>> >wouldn't violate the contract of union
>> >
>> >AFIAK the only guarentee is the resulting RDD will contain all elements.
>> >
>> >- Patrick
>> >
>> >On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim  wrote:
>>> >> Yes, that’s what I meant. Sure, the numbers might not be actually
>>> >>sorted,
>>> >> but the order of rows semantically are kept throughout non-shuffling
>>> >> transforms. I’m on board with you on union as well.
>>> >>
>>> >> Back to the original question, then, why is it important to coalesce to
>>> >>a
>>> >> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
>>> >> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
>>> >> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
>>> >>three
>>> >> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
>>> >>the
>>> >> two reds are concatenated.
>>> >>
>>> >> Mingyu
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On 4/29/14, 10:55 PM, "Patrick Wendell"  wrote:
>>> >>
>>>> >>>If you call map() on an RDD it will retain the ordering it had before,
>>>> >>>but that is not necessarily a correct sort order for the new RDD.
>>>> >>>
>>>> >>>var rdd = sc.parallelize([2, 1, 3]);
>>>> >>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>>> >>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>>>> >>>
>>>> >>>Note that mapped is no longer sorted.
>>>> >>>
>>>> >>>When you union two RDD's together it will effectively concatenate the
>>>> >>>two orderings, which is also not a valid sorted order on the new RDD:
>>>> >>>
>>>> >>>rdd1 = [1,2,3]
>>>> >>>rdd2 = [1,4,5]
>>>> >>>
>>>> >>>rdd1.union(rdd2) = [1,2,3,1,4,5]
>>>> >>>
>>>> >>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim  wrote:
>>>>> >>>> Thanks for the quick response!
>>>>> >>>>
>>>>> >>>> To better understand it, the reason sorted RDD has a well-defined
>>>>> >>>>ordering
>>>>> >>>> is because sortedRDD.getPartitions() returns the partitions in the
>>>>> >>>>right
>>>>> >>>> o

Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
Okay, that makes sense. It’d be great if this can be better documented at
some point, because the only way to find out about the resulting RDD row
order is by looking at the code.

Thanks for the discussion!

Mingyu




On 4/29/14, 11:59 PM, "Patrick Wendell"  wrote:

>I don't think we guarantee anywhere that union(A, B) will behave by
>concatenating the partitions, it just happens to be an artifact of the
>current implementation.
>
>rdd1 = [1,2,3]
>rdd2 = [1,4,5]
>
>rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
>rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
>wouldn't violate the contract of union
>
>AFIAK the only guarentee is the resulting RDD will contain all elements.
>
>- Patrick
>
>On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim  wrote:
>> Yes, that’s what I meant. Sure, the numbers might not be actually
>>sorted,
>> but the order of rows semantically are kept throughout non-shuffling
>> transforms. I’m on board with you on union as well.
>>
>> Back to the original question, then, why is it important to coalesce to
>>a
>> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
>> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
>> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
>>three
>> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
>>the
>> two reds are concatenated.
>>
>> Mingyu
>>
>>
>>
>>
>> On 4/29/14, 10:55 PM, "Patrick Wendell"  wrote:
>>
>>>If you call map() on an RDD it will retain the ordering it had before,
>>>but that is not necessarily a correct sort order for the new RDD.
>>>
>>>var rdd = sc.parallelize([2, 1, 3]);
>>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>>>
>>>Note that mapped is no longer sorted.
>>>
>>>When you union two RDD's together it will effectively concatenate the
>>>two orderings, which is also not a valid sorted order on the new RDD:
>>>
>>>rdd1 = [1,2,3]
>>>rdd2 = [1,4,5]
>>>
>>>rdd1.union(rdd2) = [1,2,3,1,4,5]
>>>
>>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim  wrote:
>>>> Thanks for the quick response!
>>>>
>>>> To better understand it, the reason sorted RDD has a well-defined
>>>>ordering
>>>> is because sortedRDD.getPartitions() returns the partitions in the
>>>>right
>>>> order and each partition internally is properly sorted. So, if you
>>>>have
>>>>
>>>> var rdd = sc.parallelize([2, 1, 3]);
>>>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>>>
>>>> Since mapValues doesn’t change the order of partitions not change the
>>>> order of rows within the partitions, I think “mapped” should have the
>>>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>>>the
>>>> order will change. Am I mistaken? Is there an extra detail in
>>>>sortedRDD
>>>> that guarantees a well-defined ordering?
>>>>
>>>> If it’s true that the order of partitions returned by
>>>>RDD.getPartitions()
>>>> and the row orders within the partitions determine the row order, I’m
>>>>not
>>>> sure why union doesn’t respect the order because union operation
>>>>simply
>>>> concatenates the two lists of partitions from the two RDDs.
>>>>
>>>> Mingyu
>>>>
>>>>
>>>>
>>>>
>>>> On 4/29/14, 10:25 PM, "Patrick Wendell"  wrote:
>>>>
>>>>>You are right, once you sort() the RDD, then yes it has a well defined
>>>>>ordering.
>>>>>
>>>>>But that ordering is lost as soon as you transform the RDD, including
>>>>>if you union it with another RDD.
>>>>>
>>>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim 
>>>>>wrote:
>>>>>> Hi Patrick,
>>>>>>
>>>>>> I¹m a little confused about your comment that RDDs are not ordered.
>>>>>>As
>>>>>>far
>>>>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>>>>why I
>>>>>> can call RDD.take() and get the same

Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Mingyu Kim
Yes, that’s what I meant. Sure, the numbers might not be actually sorted,
but the order of rows semantically are kept throughout non-shuffling
transforms. I’m on board with you on union as well.

Back to the original question, then, why is it important to coalesce to a
single partition? When you union two RDDs, for example, rdd1 = [“a, b,
c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three
lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the
two reds are concatenated.

Mingyu




On 4/29/14, 10:55 PM, "Patrick Wendell"  wrote:

>If you call map() on an RDD it will retain the ordering it had before,
>but that is not necessarily a correct sort order for the new RDD.
>
>var rdd = sc.parallelize([2, 1, 3]);
>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>
>Note that mapped is no longer sorted.
>
>When you union two RDD's together it will effectively concatenate the
>two orderings, which is also not a valid sorted order on the new RDD:
>
>rdd1 = [1,2,3]
>rdd2 = [1,4,5]
>
>rdd1.union(rdd2) = [1,2,3,1,4,5]
>
>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim  wrote:
>> Thanks for the quick response!
>>
>> To better understand it, the reason sorted RDD has a well-defined
>>ordering
>> is because sortedRDD.getPartitions() returns the partitions in the right
>> order and each partition internally is properly sorted. So, if you have
>>
>> var rdd = sc.parallelize([2, 1, 3]);
>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>
>> Since mapValues doesn’t change the order of partitions not change the
>> order of rows within the partitions, I think “mapped” should have the
>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>the
>> order will change. Am I mistaken? Is there an extra detail in sortedRDD
>> that guarantees a well-defined ordering?
>>
>> If it’s true that the order of partitions returned by
>>RDD.getPartitions()
>> and the row orders within the partitions determine the row order, I’m
>>not
>> sure why union doesn’t respect the order because union operation simply
>> concatenates the two lists of partitions from the two RDDs.
>>
>> Mingyu
>>
>>
>>
>>
>> On 4/29/14, 10:25 PM, "Patrick Wendell"  wrote:
>>
>>>You are right, once you sort() the RDD, then yes it has a well defined
>>>ordering.
>>>
>>>But that ordering is lost as soon as you transform the RDD, including
>>>if you union it with another RDD.
>>>
>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim  wrote:
>>>> Hi Patrick,
>>>>
>>>> I¹m a little confused about your comment that RDDs are not ordered. As
>>>>far
>>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>>why I
>>>> can call RDD.take() and get the same first k rows every time I call it
>>>>and
>>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>>> preserves the partition order. RDD order is also what allows me to get
>>>>the
>>>> top k out of RDD by doing RDD.sort().take().
>>>>
>>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>>that
>>>> the order is not well preserved? Thanks in advance!
>>>>
>>>> Mingyu
>>>>
>>>>
>>>>
>>>>
>>>> On 1/22/14, 4:46 PM, "Patrick Wendell"  wrote:
>>>>
>>>>>Ah somehow after all this time I've never seen that!
>>>>>
>>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>>>
>>>>>wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
>>>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>> What is the ++ operator here? Is this something you defined?
>>>>>>
>>>>>>
>>>>>> No, it's an alias for union defined in RDD.scala:
>>>>>>
>>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Another issue is that RDD's are not ordered, so when y

Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Mingyu Kim
Thanks for the quick response!

To better understand it, the reason sorted RDD has a well-defined ordering
is because sortedRDD.getPartitions() returns the partitions in the right
order and each partition internally is properly sorted. So, if you have

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]

Since mapValues doesn’t change the order of partitions not change the
order of rows within the partitions, I think “mapped” should have the
exact same order as “sorted”. Sure, if a transform involves shuffling, the
order will change. Am I mistaken? Is there an extra detail in sortedRDD
that guarantees a well-defined ordering?

If it’s true that the order of partitions returned by RDD.getPartitions()
and the row orders within the partitions determine the row order, I’m not
sure why union doesn’t respect the order because union operation simply
concatenates the two lists of partitions from the two RDDs.

Mingyu




On 4/29/14, 10:25 PM, "Patrick Wendell"  wrote:

>You are right, once you sort() the RDD, then yes it has a well defined
>ordering.
>
>But that ordering is lost as soon as you transform the RDD, including
>if you union it with another RDD.
>
>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim  wrote:
>> Hi Patrick,
>>
>> I¹m a little confused about your comment that RDDs are not ordered. As
>>far
>> as I know, RDDs keep list of partitions that are ordered and this is
>>why I
>> can call RDD.take() and get the same first k rows every time I call it
>>and
>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>> preserves the partition order. RDD order is also what allows me to get
>>the
>> top k out of RDD by doing RDD.sort().take().
>>
>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>that
>> the order is not well preserved? Thanks in advance!
>>
>> Mingyu
>>
>>
>>
>>
>> On 1/22/14, 4:46 PM, "Patrick Wendell"  wrote:
>>
>>>Ah somehow after all this time I've never seen that!
>>>
>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>
>>>wrote:
>>>>
>>>>
>>>>
>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell 
>>>> wrote:
>>>>>
>>>>> What is the ++ operator here? Is this something you defined?
>>>>
>>>>
>>>> No, it's an alias for union defined in RDD.scala:
>>>>
>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>
>>>>>
>>>>>
>>>>> Another issue is that RDD's are not ordered, so when you union two
>>>>> together it doesn't have a well defined ordering.
>>>>>
>>>>> If you do want to do this you could coalesce into one partition, then
>>>>> call MapPartitions and return an iterator that first adds your header
>>>>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>>>>> this will only work if you coalesce into a single partition.
>>>>
>>>>
>>>> Thanks! I'll give this a try.
>>>>
>>>>>
>>>>>
>>>>> myRdd.coalesce(1)
>>>>> .map(_.mkString(",")))
>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>> .saveAsTextFile("out.csv")
>>>>>
>>>>> - Patrick
>>>>>
>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>>  wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I'm trying to find a way to create a csv header when using
>>>>> > saveAsTextFile,
>>>>> > and I came up with this:
>>>>> >
>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>> >   .saveAsTextFile("out.csv")
>>>>> >
>>>>> > But it only saves the header part. Why is that the union method
>>>>>does
>>>>>not
>>>>> > return both RDD's?
>>>>
>>>>


smime.p7s
Description: S/MIME cryptographic signature


Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Mingyu Kim
Hi Patrick,

I¹m a little confused about your comment that RDDs are not ordered. As far
as I know, RDDs keep list of partitions that are ordered and this is why I
can call RDD.take() and get the same first k rows every time I call it and
RDD.take() returns the same entries as RDD.map(Š).take() because map
preserves the partition order. RDD order is also what allows me to get the
top k out of RDD by doing RDD.sort().take().

Am I misunderstanding it? Or, is it just when RDD is written to disk that
the order is not well preserved? Thanks in advance!

Mingyu




On 1/22/14, 4:46 PM, "Patrick Wendell"  wrote:

>Ah somehow after all this time I've never seen that!
>
>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia 
>wrote:
>>
>>
>>
>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell 
>> wrote:
>>>
>>> What is the ++ operator here? Is this something you defined?
>>
>>
>> No, it's an alias for union defined in RDD.scala:
>>
>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>
>>>
>>>
>>> Another issue is that RDD's are not ordered, so when you union two
>>> together it doesn't have a well defined ordering.
>>>
>>> If you do want to do this you could coalesce into one partition, then
>>> call MapPartitions and return an iterator that first adds your header
>>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>>> this will only work if you coalesce into a single partition.
>>
>>
>> Thanks! I'll give this a try.
>>
>>>
>>>
>>> myRdd.coalesce(1)
>>> .map(_.mkString(",")))
>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>> .saveAsTextFile("out.csv")
>>>
>>> - Patrick
>>>
>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>  wrote:
>>> > Hi,
>>> >
>>> > I'm trying to find a way to create a csv header when using
>>> > saveAsTextFile,
>>> > and I came up with this:
>>> >
>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>> >   .saveAsTextFile("out.csv")
>>> >
>>> > But it only saves the header part. Why is that the union method does
>>>not
>>> > return both RDD's?
>>
>>


smime.p7s
Description: S/MIME cryptographic signature


Spark reads partitions in a wrong order

2014-04-25 Thread Mingyu Kim
If the underlying file system returns files in a non-alphabetical order to
java.io.File.listFiles(), Spark reads the partitions out of order. Here¹s an
example.

var sc = new SparkContext(³local[3]², ³test²);
var rdd1 = sc.parallelize([1,2,3,4,5]);
rdd1.saveAsTextFile(³file://path/to/file²);
var rdd2 = sc.textFile(³file://path/to/file²);
rdd2.collect();

rdd1 is saved to file://path/to/file in three partitions. (I.e.
/path/to/file/part-0, /path/to/file/part-1,
/path/to/file/part-2) Since File.listFiles(), which is used in
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(), returns the partitions
out-of-order, rdd2 has the rows in the order different from that of rdd1.
Note that File.listFiles() explicitly says that it doesn¹t guarantee the
order.

The behavior of RawLocalFileSystem is fine for MapReduce jobs because they
don¹t care about orders, but for Spark, which has a notion of row order,
this looks like a bug. The correct fix would be to sort the files after
calling File.listFiles().

This may be possible to fix somewhere by creating a wrapper
org.apache.hadoop.fs.FileSystem class that sorts the file list before
returning. Is this considered in the original design? Is this a bug?

Mingyu




smime.p7s
Description: S/MIME cryptographic signature