Spark master OOMs with exception stack trace stored in JobProgressListener (SPARK-4906)

2014-12-19 Thread Mingyu Kim
Hi,

I just filed a bug 
SPARK-4906, regarding Spark 
master OOMs. If I understand correctly, the UI states for all running 
applications are kept in memory retained by JobProgressListener, and when there 
are a lot of exception stack traces, this UI states can take up a significant 
amount of heap. This seems very bad especially for long-running applications.

Can you correct me if I’m misunderstanding anything? If my understanding is 
correct, is there any work being done to make sure the UI states don’t grow 
indefinitely over time? Would it make sense to spill some states to disk or 
work with what spark.eventLog is doing so Spark master doesn’t need to keep 
things in memory?

Thanks,
Mingyu


Re: Streaming partitions to driver for use in .toLocalIterator

2015-02-18 Thread Mingyu Kim
Another alternative would be to compress the partition in memory in a
streaming fashion instead of calling .toArray on the iterator. Would it be
an easier mitigation to the problem? Or, is it hard to compress the rows
one by one without materializing the full partition in memory using the
compression algo Spark uses currently?

Mingyu





On 2/18/15, 1:01 PM, "Imran Rashid"  wrote:

>This would be pretty tricky to do -- the issue is that right now
>sparkContext.runJob has you pass in a function from a partition to *one*
>result object that gets serialized and sent back: Iterator[T] => U, and
>that idea is baked pretty deep into a lot of the internals, DAGScheduler,
>Task, Executors, etc.
>
>Maybe another possibility worth considering: should we make it easy to go
>from N partitions to 2N partitions (or any other multiple obviously)
>without requiring a shuffle?  for that matter, you should also be able to
>go from 2N to N without a shuffle as well.  That change is also somewhat
>involved, though.
>
>Both are in theory possible, but I imagine they'd need really compelling
>use cases.
>
>An alternative would be to write your RDD to some other data store (eg,
>hdfs) which has better support for reading data in a streaming fashion,
>though you would probably be unhappy with the overhead.
>
>
>
>On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash  wrote:
>
>> Hi Spark devs,
>>
>> I'm creating a streaming export functionality for RDDs and am having
>>some
>> trouble with large partitions.  The RDD.toLocalIterator() call pulls
>>over a
>> partition at a time to the driver, and then streams the RDD out from
>>that
>> partition before pulling in the next one.  When you have large
>>partitions
>> though, you can OOM the driver, especially when multiple of these
>>exports
>> are happening in the same SparkContext.
>>
>> One idea I had was to repartition the RDD so partitions are smaller, but
>> it's hard to know a priori what the partition count should be, and I'd
>>like
>> to avoid paying the shuffle cost if possible -- I think repartition to a
>> higher partition count forces a shuffle.
>>
>> Is it feasible to rework this so the executor -> driver transfer in
>> .toLocalIterator is a steady stream rather than a partition at a time?
>>
>> Thanks!
>> Andrew
>>


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



The default CDH4 build uses avro-mapred hadoop1

2015-02-20 Thread Mingyu Kim
Hi all,

Related to https://issues.apache.org/jira/browse/SPARK-3039, the default CDH4 
build, which is built with "mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests 
clean package”, pulls in avro-mapred hadoop1, as opposed to avro-mapred 
hadoop2. This ends up in the same error as mentioned in the linked bug. (pasted 
below).

The right solution would be to create a hadoop-2.0 profile that sets 
avro.mapred.classifier to hadoop2, and to build CDH4 build with “-Phadoop-2.0” 
option.

What do people think?

Mingyu

——

java.lang.IncompatibleClassChangeError: Found interface 
org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
   at 
org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
   at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:133)
   at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
   at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
   at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)



Re: The default CDH4 build uses avro-mapred hadoop1

2015-02-20 Thread Mingyu Kim
Thanks for the explanation.

To be clear, I meant to speak for any hadoop 2 releases before 2.2, which
have profiles in Spark. I referred to CDH4, since that¹s the only Hadoop
2.0/2.1 version Spark ships a prebuilt package for.

I understand the hesitation of making a code change if Spark doesn¹t plan
to support Hadoop 2.0/2.1 in general. (Please note, this is not specific
to CDH4) If so, can I propose alternative options until Spark moves to
only support hadoop2?

- Build the CDH4 package with ³-Davro.mapred.classifier=hadoop2², and
update http://spark.apache.org/docs/latest/building-spark.html for all
³2.0.*² examples.
- Build the CDH4 package as is, but note known issues clearly in the
³download² page.
- Simply do not ship CDH4 prebuilt package, and let people figure it out
themselves. Preferably, note in documentation that
³-Davro.mapred.classifier=hadoop2² should be used for all hadoop ³2.0.*²
builds.

Please let me know what you think!

Mingyu





On 2/20/15, 2:34 AM, "Sean Owen"  wrote:

>True, although a number of other little issues make me, personally,
>not want to continue down this road:
>
>- There are already a lot of build profiles to try to cover Hadoop
>versions
>- I don't think it's quite right to have vendor-specific builds in
>Spark to begin with
>- We should be moving to only support Hadoop 2 soon IMHO anyway
>- CDH4 is EOL in a few months I think
>
>On Fri, Feb 20, 2015 at 8:30 AM, Mingyu Kim  wrote:
>> Hi all,
>>
>> Related to 
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_ji
>>ra_browse_SPARK-2D3039&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oO
>>nmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=s1MfvBlt11h2xojQItkw
>>aeh094ttUKTu9K5F-lA6DJY&s=Sb2SVubKkvdjaLer3K-b_Z0RfeC1fm-CP4A-Uh6nvEQ&e=
>>, the default CDH4 build, which is built with "mvn
>>-Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package², pulls in
>>avro-mapred hadoop1, as opposed to avro-mapred hadoop2. This ends up in
>>the same error as mentioned in the linked bug. (pasted below).
>>
>> The right solution would be to create a hadoop-2.0 profile that sets
>>avro.mapred.classifier to hadoop2, and to build CDH4 build with
>>³-Phadoop-2.0² option.
>>
>> What do people think?
>>
>> Mingyu
>>
>> ‹‹
>>
>> java.lang.IncompatibleClassChangeError: Found interface
>>org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
>>at 
>>org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyIn
>>putFormat.java:47)
>>at 
>>org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:133)
>>at 
>>org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
>>at 
>>org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
>>at 
>>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>at 
>>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>at 
>>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
>>at 
>>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>at 
>>org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>at 
>>org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>at 
>>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java
>>:1145)
>>at 
>>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav
>>a:615)
>>at java.lang.Thread.run(Thread.java:745)
>>


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Task result is serialized twice by serializer and closure serializer

2015-03-04 Thread Mingyu Kim
Hi all,

It looks like the result of task is serialized twice, once by serializer (I.e. 
Java/Kryo depending on configuration) and once again by closure serializer 
(I.e. Java). To link the actual code,

The first one: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L213
The second one: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L226

This serializes the “value”, which is the result of task run twice, which 
affects things like collect(), takeSample(), and toLocalIterator(). Would it 
make sense to simply serialize the DirectTaskResult once using the regular 
“serializer” (as opposed to closure serializer)? Would it cause problems when 
the Accumulator values are not Kryo-serializable?

Alternatively, if we can assume that Accumator values are small, we can 
closure-serialize those, put the serialized byte array in DirectTaskResult with 
the raw task result “value”, and serialize DirectTaskResult.

What do people think?

Thanks,
Mingyu


Re: Task result is serialized twice by serializer and closure serializer

2015-03-04 Thread Mingyu Kim
The concern is really just the runtime overhead and memory footprint of
Java-serializing an already-serialized byte array again. We originally
noticed this when we were using RDD.toLocalIterator() which serializes the
entire 64MB partition. We worked around this issue by kryo-serializing and
snappy-compressing the partition on the executor side before returning it
back to the driver, but this operation just felt redundant.

Your explanation about reporting the time taken makes it clearer why it¹s
designed this way. Since the byte array for the serialized task result
shouldn¹t account for the majority of memory footprint anyways, I¹m okay
with leaving it as is, then.

Thanks,
Mingyu





On 3/4/15, 5:07 PM, "Patrick Wendell"  wrote:

>Hey Mingyu,
>
>I think it's broken out separately so we can record the time taken to
>serialize the result. Once we serializing it once, the second
>serialization should be really simple since it's just wrapping
>something that has already been turned into a byte buffer. Do you see
>a specific issue with serializing it twice?
>
>I think you need to have two steps if you want to record the time
>taken to serialize the result, since that needs to be sent back to the
>driver when the task completes.
>
>- Patrick
>
>On Wed, Mar 4, 2015 at 4:01 PM, Mingyu Kim  wrote:
>> Hi all,
>>
>> It looks like the result of task is serialized twice, once by
>>serializer (I.e. Java/Kryo depending on configuration) and once again by
>>closure serializer (I.e. Java). To link the actual code,
>>
>> The first one: 
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
>>ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc
>>ala-23L213&d=AwIFAw&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=ennQJ
>>q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9
>>WMY_2Z07ulA&s=cSKekTNmnB0g54h6-FaF-zOL46UZC_1_LdKK3p9Q0aA&e=
>> The second one: 
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
>>ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc
>>ala-23L226&d=AwIFAw&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=ennQJ
>>q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9
>>WMY_2Z07ulA&s=PFoz0HyINd2XuiqkHPgyMsOh9eFkCwXOdl9zdxfBwxM&e=
>>
>> This serializes the "value", which is the result of task run twice,
>>which affects things like collect(), takeSample(), and
>>toLocalIterator(). Would it make sense to simply serialize the
>>DirectTaskResult once using the regular "serializer" (as opposed to
>>closure serializer)? Would it cause problems when the Accumulator values
>>are not Kryo-serializable?
>>
>> Alternatively, if we can assume that Accumator values are small, we can
>>closure-serialize those, put the serialized byte array in
>>DirectTaskResult with the raw task result "value", and serialize
>>DirectTaskResult.
>>
>> What do people think?
>>
>> Thanks,
>> Mingyu


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Task result is serialized twice by serializer and closure serializer

2015-03-04 Thread Mingyu Kim
Yep, that makes sense. Thanks for the clarification!

Mingyu





On 3/4/15, 8:05 PM, "Patrick Wendell"  wrote:

>Yeah, it will result in a second serialized copy of the array (costing
>some memory). But the computational overhead should be very small. The
>absolute worst case here will be when doing a collect() or something
>similar that just bundles the entire partition.
>
>- Patrick
>
>On Wed, Mar 4, 2015 at 5:47 PM, Mingyu Kim  wrote:
>> The concern is really just the runtime overhead and memory footprint of
>> Java-serializing an already-serialized byte array again. We originally
>> noticed this when we were using RDD.toLocalIterator() which serializes
>>the
>> entire 64MB partition. We worked around this issue by kryo-serializing
>>and
>> snappy-compressing the partition on the executor side before returning
>>it
>> back to the driver, but this operation just felt redundant.
>>
>> Your explanation about reporting the time taken makes it clearer why
>>it¹s
>> designed this way. Since the byte array for the serialized task result
>> shouldn¹t account for the majority of memory footprint anyways, I¹m okay
>> with leaving it as is, then.
>>
>> Thanks,
>> Mingyu
>>
>>
>>
>>
>>
>> On 3/4/15, 5:07 PM, "Patrick Wendell"  wrote:
>>
>>>Hey Mingyu,
>>>
>>>I think it's broken out separately so we can record the time taken to
>>>serialize the result. Once we serializing it once, the second
>>>serialization should be really simple since it's just wrapping
>>>something that has already been turned into a byte buffer. Do you see
>>>a specific issue with serializing it twice?
>>>
>>>I think you need to have two steps if you want to record the time
>>>taken to serialize the result, since that needs to be sent back to the
>>>driver when the task completes.
>>>
>>>- Patrick
>>>
>>>On Wed, Mar 4, 2015 at 4:01 PM, Mingyu Kim  wrote:
>>>> Hi all,
>>>>
>>>> It looks like the result of task is serialized twice, once by
>>>>serializer (I.e. Java/Kryo depending on configuration) and once again
>>>>by
>>>>closure serializer (I.e. Java). To link the actual code,
>>>>
>>>> The first one:
>>>>https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_
>>>>sp
>>>>ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.
>>>>sc
>>>>ala-23L213&d=AwIFAw&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=enn
>>>>QJ
>>>>q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH
>>>>-9
>>>>WMY_2Z07ulA&s=cSKekTNmnB0g54h6-FaF-zOL46UZC_1_LdKK3p9Q0aA&e=
>>>> The second one:
>>>>https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_
>>>>sp
>>>>ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.
>>>>sc
>>>>ala-23L226&d=AwIFAw&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=enn
>>>>QJ
>>>>q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH
>>>>-9
>>>>WMY_2Z07ulA&s=PFoz0HyINd2XuiqkHPgyMsOh9eFkCwXOdl9zdxfBwxM&e=
>>>>
>>>> This serializes the "value", which is the result of task run twice,
>>>>which affects things like collect(), takeSample(), and
>>>>toLocalIterator(). Would it make sense to simply serialize the
>>>>DirectTaskResult once using the regular "serializer" (as opposed to
>>>>closure serializer)? Would it cause problems when the Accumulator
>>>>values
>>>>are not Kryo-serializable?
>>>>
>>>> Alternatively, if we can assume that Accumator values are small, we
>>>>can
>>>>closure-serialize those, put the serialized byte array in
>>>>DirectTaskResult with the raw task result "value", and serialize
>>>>DirectTaskResult.
>>>>
>>>> What do people think?
>>>>
>>>> Thanks,
>>>> Mingyu
>>



toLocalIterator creates as many jobs as # of partitions, and it ends up spamming Spark UI

2015-03-13 Thread Mingyu Kim
Hi all,

RDD.toLocalIterator() creates as many jobs as # of partitions and it spams 
Spark UI especially when the method is used on an RDD with hundreds or 
thousands of partitions.

Does anyone have a way to work around this issue? What do people think about 
introducing a SparkContext local property (analogous to “spark.scheduler.pool” 
set as a thread-local property) that determines if the job info should be shown 
on the Spark UI?

Thanks,
Mingyu


Re: [discuss] new Java friendly InputSource API

2015-04-23 Thread Mingyu Kim
Hi Reynold,

You mentioned that the new API allows arbitrary code to be run on the
driver side, but it¹s not very clear to me how this is different from what
Hadoop API provides. In your example of using broadcast, did you mean
broadcasting something in InputSource.getPartitions() and having
InputPartitions use the broadcast variables? Isn¹t that already possible
with Hadoop's InputFormat.getSplits()?

Thanks,
Mingyu





On 4/21/15, 4:33 PM, "Soren Macbeth"  wrote:

>I'm also super interested in this. Flambo (our clojure DSL) wraps the java
>api and it would be great to have this.
>
>On Tue, Apr 21, 2015 at 4:10 PM, Reynold Xin  wrote:
>
>> It can reuse. That's a good point and we should document it in the API
>> contract.
>>
>>
>> On Tue, Apr 21, 2015 at 4:06 PM, Punyashloka Biswal <
>> punya.bis...@gmail.com>
>> wrote:
>>
>> > Reynold, thanks for this! At Palantir we're heavy users of the Java
>>APIs
>> > and appreciate being able to stop hacking around with fake ClassTags
>>:)
>> >
>> > Regarding this specific proposal, is the contract of RecordReader#get
>> > intended to be that it returns a fresh object each time? Or is it
>>allowed
>> > to mutate a fixed object and return a pointer to it each time?
>> >
>> > Put another way, is a caller supposed to clone the output of get() if
>> they
>> > want to use it later?
>> >
>> > Punya
>> >
>> > On Tue, Apr 21, 2015 at 4:35 PM Reynold Xin 
>>wrote:
>> >
>> >> I created a pull request last night for a new InputSource API that is
>> >> essentially a stripped down version of the RDD API for providing data
>> into
>> >> Spark. Would be great to hear the community's feedback.
>> >>
>> >> Spark currently has two de facto input source API:
>> >> 1. RDD
>> >> 2. Hadoop MapReduce InputFormat
>> >>
>> >> Neither of the above is ideal:
>> >>
>> >> 1. RDD: It is hard for Java developers to implement RDD, given the
>> >> implicit
>> >> class tags. In addition, the RDD API depends on Scala's runtime
>>library,
>> >> which does not preserve binary compatibility across Scala versions.
>>If a
>> >> developer chooses Java to implement an input source, it would be
>>great
>> if
>> >> that input source can be binary compatible in years to come.
>> >>
>> >> 2. Hadoop InputFormat: The Hadoop InputFormat API is overly
>>restrictive.
>> >> For example, it forces key-value semantics, and does not support
>>running
>> >> arbitrary code on the driver side (an example of why this is useful
>>is
>> >> broadcast). In addition, it is somewhat awkward to tell developers
>>that
>> in
>> >> order to implement an input source for Spark, they should learn the
>> Hadoop
>> >> MapReduce API first.
>> >>
>> >>
>> >> My patch creates a new InputSource interface, described by:
>> >>
>> >> - an array of InputPartition that specifies the data partitioning
>> >> - a RecordReader that specifies how data on each partition can be
>>read
>> >>
>> >> This interface is similar to Hadoop's InputFormat, except that there
>>is
>> no
>> >> explicit key/value separation.
>> >>
>> >>
>> >> JIRA ticket: 
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_ji
>>ra_browse_SPARK-2D7025&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oO
>>nmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=WO8We1dUoSercyCHNAKk
>>tWH_nMrqD5TUhek8mTSCfFs&s=xUHYpQoU3NlV__I37IUkVwf94zzgAvtIj6N6uy2vwnc&e=
>> >> Pull request:
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
>>ark_pull_5603&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=en
>>nQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=WO8We1dUoSercyCHNAKktWH_nMrqD
>>5TUhek8mTSCfFs&s=qoAlpURPOSkRgXxtlXHChqVHjm3yiPFgERk4LwKHLpg&e=
>> >>
>> >
>>


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [discuss] new Java friendly InputSource API

2015-04-24 Thread Mingyu Kim
I see. So, the difference is that the InputSource is instantiated on the driver 
side and gets sent to the executors, whereas Hadoop’s InputFormats are 
instantiated via reflection on the executors. That makes sense. Thanks for the 
clarification!

Mingyu

From: Reynold Xin mailto:r...@databricks.com>>
Date: Thursday, April 23, 2015 at 11:09 AM
To: Mingyu Kim mailto:m...@palantir.com>>
Cc: Soren Macbeth mailto:so...@yieldbot.com>>, Punyashloka 
Biswal mailto:punya.bis...@gmail.com>>, 
"dev@spark.apache.org<mailto:dev@spark.apache.org>" 
mailto:dev@spark.apache.org>>
Subject: Re: [discuss] new Java friendly InputSource API

In the ctor of InputSource (I'm also considering adding an explicit initialize 
call), the implementation of InputSource can execute arbitrary code. The state 
in it will also be serialized and passed onto the executors.

Yes - technically you can hijack getSplits in Hadoop InputFormat to do the same 
thing, and then put a reference of the state into every Split. But that's kind 
of awkward. Hadoop relies on the giant Configuration object to pass state over.



On Thu, Apr 23, 2015 at 11:02 AM, Mingyu Kim 
mailto:m...@palantir.com>> wrote:
Hi Reynold,

You mentioned that the new API allows arbitrary code to be run on the
driver side, but it¹s not very clear to me how this is different from what
Hadoop API provides. In your example of using broadcast, did you mean
broadcasting something in InputSource.getPartitions() and having
InputPartitions use the broadcast variables? Isn¹t that already possible
with Hadoop's InputFormat.getSplits()?

Thanks,
Mingyu





On 4/21/15, 4:33 PM, "Soren Macbeth" 
mailto:so...@yieldbot.com>> wrote:

>I'm also super interested in this. Flambo (our clojure DSL) wraps the java
>api and it would be great to have this.
>
>On Tue, Apr 21, 2015 at 4:10 PM, Reynold Xin 
>mailto:r...@databricks.com>> wrote:
>
>> It can reuse. That's a good point and we should document it in the API
>> contract.
>>
>>
>> On Tue, Apr 21, 2015 at 4:06 PM, Punyashloka Biswal <
>> punya.bis...@gmail.com<mailto:punya.bis...@gmail.com>>
>> wrote:
>>
>> > Reynold, thanks for this! At Palantir we're heavy users of the Java
>>APIs
>> > and appreciate being able to stop hacking around with fake ClassTags
>>:)
>> >
>> > Regarding this specific proposal, is the contract of RecordReader#get
>> > intended to be that it returns a fresh object each time? Or is it
>>allowed
>> > to mutate a fixed object and return a pointer to it each time?
>> >
>> > Put another way, is a caller supposed to clone the output of get() if
>> they
>> > want to use it later?
>> >
>> > Punya
>> >
>> > On Tue, Apr 21, 2015 at 4:35 PM Reynold Xin 
>> > mailto:r...@databricks.com>>
>>wrote:
>> >
>> >> I created a pull request last night for a new InputSource API that is
>> >> essentially a stripped down version of the RDD API for providing data
>> into
>> >> Spark. Would be great to hear the community's feedback.
>> >>
>> >> Spark currently has two de facto input source API:
>> >> 1. RDD
>> >> 2. Hadoop MapReduce InputFormat
>> >>
>> >> Neither of the above is ideal:
>> >>
>> >> 1. RDD: It is hard for Java developers to implement RDD, given the
>> >> implicit
>> >> class tags. In addition, the RDD API depends on Scala's runtime
>>library,
>> >> which does not preserve binary compatibility across Scala versions.
>>If a
>> >> developer chooses Java to implement an input source, it would be
>>great
>> if
>> >> that input source can be binary compatible in years to come.
>> >>
>> >> 2. Hadoop InputFormat: The Hadoop InputFormat API is overly
>>restrictive.
>> >> For example, it forces key-value semantics, and does not support
>>running
>> >> arbitrary code on the driver side (an example of why this is useful
>>is
>> >> broadcast). In addition, it is somewhat awkward to tell developers
>>that
>> in
>> >> order to implement an input source for Spark, they should learn the
>> Hadoop
>> >> MapReduce API first.
>> >>
>> >>
>> >> My patch creates a new InputSource interface, described by:
>> >>
>> >> - an array of InputPartition that specifies the data partitioning
>> >> - a RecordReader that specifies how data on each partition can be
>>read
>> >>
>> >> This interface is similar to Hadoop's InputFormat, except that there
>>is
>> no
>> >> explicit key/value separation.
>> >>
>> >>
>> >> JIRA ticket:
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_ji
>>ra_browse_SPARK-2D7025&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oO
>>nmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=WO8We1dUoSercyCHNAKk
>>tWH_nMrqD5TUhek8mTSCfFs&s=xUHYpQoU3NlV__I37IUkVwf94zzgAvtIj6N6uy2vwnc&e=
>> >> Pull request:
>>https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
>>ark_pull_5603&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=en
>>nQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=WO8We1dUoSercyCHNAKktWH_nMrqD
>>5TUhek8mTSCfFs&s=qoAlpURPOSkRgXxtlXHChqVHjm3yiPFgERk4LwKHLpg&e=
>> >>
>> >
>>




Re: And.eval short circuiting

2015-09-17 Thread Mingyu Kim
That sounds good. I think the optimizer should not change the behavior of 
execution and reordering the filters can easily result in errors as exemplified 
below. I agree that the optimizer should not reorder the filters for 
correctness. Please correct me if I have an incorrect assumption about the 
guarantees of the optimizer.

Is there a bug filed that tracks the change you suggested below, btw? I’d like 
to follow the issue, if there’s one.

Thanks,
Mingyu

From:  Reynold Xin
Date:  Wednesday, September 16, 2015 at 1:17 PM
To:  Zack Sampson
Cc:  "dev@spark.apache.org", Mingyu Kim, Peter Faiman, Matt Cheah, Michael 
Armbrust
Subject:  Re: And.eval short circuiting

This is "expected" in the sense that DataFrame operations can get re-ordered 
under the hood by the optimizer. For example, if the optimizer deems it is 
cheaper to apply the 2nd filter first, it might re-arrange the filters. In 
reality, it doesn't do that. I think this is too confusing and violates 
principle of least astonishment, so we should fix it. 

I discussed more with Michael offline, and think we can add a rule for the 
physical filter operator to replace the general AND/OR/equality/etc with a 
special version that treats null as false. This rule needs to be carefully 
written because it should only apply to subtrees of AND/OR/equality/etc (e.g. 
it shouldn't rewrite children of isnull).


On Tue, Sep 15, 2015 at 1:09 PM, Zack Sampson  wrote:
I see. We're having problems with code like this (forgive my noob scala):
val df = Seq(("moose","ice"), (null,"fire")).toDF("animals", "elements")
df
  .filter($"animals".rlike(".*"))
  .filter(callUDF({(value: String) => value.length > 2}, BooleanType, 
$"animals"))
.collect()
This code throws a NPE because:
* Catalyst combines the filters with an AND
* the first filter passes returns null on the first input
* the second filter tries to read the length of that null

This feels weird. Reading that code, I wouldn't expect null to be passed to the 
second filter. Even weirder is that if you call collect() after the first 
filter you won't see nulls, and if you write the data to disk and reread it, 
the NPE won't happen.

It's bewildering! Is this the intended behavior?
From: Reynold Xin [r...@databricks.com]
Sent: Monday, September 14, 2015 10:14 PM
To: Zack Sampson
Cc: dev@spark.apache.org
Subject: Re: And.eval short circuiting

rxin=# select null and true;
 ?column? 
--
 
(1 row)

rxin=# select null and false;
 ?column? 
--
 f
(1 row)


null and false should return false.


On Mon, Sep 14, 2015 at 9:12 PM, Zack Sampson  wrote:
It seems like And.eval can avoid calculating right.eval if left.eval returns 
null. Is there a reason it's written like it is? 

override def eval(input: Row): Any = {
  val l = left.eval(input)
  if (l == false) {
false
  } else {
val r = right.eval(input)
if (r == false) {
  false
} else {
  if (l != null && r != null) {
true
  } else {
null
  }
}
  }
}





smime.p7s
Description: S/MIME cryptographic signature


Re: And.eval short circuiting

2015-09-18 Thread Mingyu Kim
I filed SPARK-10703. Thanks!

Mingyu

From:  Reynold Xin
Date:  Thursday, September 17, 2015 at 11:22 PM
To:  Mingyu Kim
Cc:  Zack Sampson, "dev@spark.apache.org", Peter Faiman, Matt Cheah, Michael 
Armbrust
Subject:  Re: And.eval short circuiting

Please file a ticket and cc me. Thanks. 


On Thu, Sep 17, 2015 at 11:20 PM, Mingyu Kim  wrote:
That sounds good. I think the optimizer should not change the behavior of 
execution and reordering the filters can easily result in errors as exemplified 
below. I agree that the optimizer should not reorder the filters for 
correctness. Please correct me if I have an incorrect assumption about the 
guarantees of the optimizer.

Is there a bug filed that tracks the change you suggested below, btw? I’d like 
to follow the issue, if there’s one.

Thanks,
Mingyu

From: Reynold Xin
Date: Wednesday, September 16, 2015 at 1:17 PM
To: Zack Sampson
Cc: "dev@spark.apache.org", Mingyu Kim, Peter Faiman, Matt Cheah, Michael 
Armbrust 

Subject: Re: And.eval short circuiting

This is "expected" in the sense that DataFrame operations can get re-ordered 
under the hood by the optimizer. For example, if the optimizer deems it is 
cheaper to apply the 2nd filter first, it might re-arrange the filters. In 
reality, it doesn't do that. I think this is too confusing and violates 
principle of least astonishment, so we should fix it. 

I discussed more with Michael offline, and think we can add a rule for the 
physical filter operator to replace the general AND/OR/equality/etc with a 
special version that treats null as false. This rule needs to be carefully 
written because it should only apply to subtrees of AND/OR/equality/etc (e.g. 
it shouldn't rewrite children of isnull).


On Tue, Sep 15, 2015 at 1:09 PM, Zack Sampson  wrote:
I see. We're having problems with code like this (forgive my noob scala):
val df = Seq(("moose","ice"), (null,"fire")).toDF("animals", "elements")
df
  .filter($"animals".rlike(".*"))
  .filter(callUDF({(value: String) => value.length > 2}, BooleanType, 
$"animals"))
.collect()
This code throws a NPE because:
* Catalyst combines the filters with an AND
* the first filter passes returns null on the first input
* the second filter tries to read the length of that null

This feels weird. Reading that code, I wouldn't expect null to be passed to the 
second filter. Even weirder is that if you call collect() after the first 
filter you won't see nulls, and if you write the data to disk and reread it, 
the NPE won't happen.

It's bewildering! Is this the intended behavior?
From: Reynold Xin [r...@databricks.com]
Sent: Monday, September 14, 2015 10:14 PM
To: Zack Sampson
Cc: dev@spark.apache.org
Subject: Re: And.eval short circuiting

rxin=# select null and true;
 ?column? 
--
 
(1 row)

rxin=# select null and false;
 ?column? 
--
 f
(1 row)


null and false should return false.


On Mon, Sep 14, 2015 at 9:12 PM, Zack Sampson  wrote:
It seems like And.eval can avoid calculating right.eval if left.eval returns 
null. Is there a reason it's written like it is? 

override def eval(input: Row): Any = {
  val l = left.eval(input)
  if (l == false) {
false
  } else {
val r = right.eval(input)
if (r == false) {
  false
} else {
  if (l != null && r != null) {
true
  } else {
null
  }
}
  }
}






smime.p7s
Description: S/MIME cryptographic signature


Re: Spark 1.6.1

2016-02-02 Thread Mingyu Kim
Hi all,

Is there an estimated timeline for 1.6.1 release? Just wanted to check how
the release is coming along. Thanks!

Mingyu

From:  Romi Kuntsman 
Date:  Tuesday, February 2, 2016 at 3:16 AM
To:  Michael Armbrust 
Cc:  Hamel Kothari , Ted Yu ,
"dev@spark.apache.org" 
Subject:  Re: Spark 1.6.1

Hi Michael,
What about the memory leak bug?
https://issues.apache.org/jira/browse/SPARK-11293

Even after the memory rewrite in 1.6.0, it still happens in some cases.
Will it be fixed for 1.6.1?
Thanks,

Romi Kuntsman, Big Data Engineer
http://www.totango.com


On Mon, Feb 1, 2016 at 9:59 PM, Michael Armbrust 
wrote:
> We typically do not allow changes to the classpath in maintenance releases.
> 
> On Mon, Feb 1, 2016 at 8:16 AM, Hamel Kothari  wrote:
>> I noticed that the Jackson dependency was bumped to 2.5 in master for
>> something spark-streaming related. Is there any reason that this upgrade
>> can't be included with 1.6.1?
>> 
>> According to later comments on this thread:
>> https://issues.apache.org/jira/browse/SPARK-8332
>> > browse_SPARK-2D8332&d=CwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=
>> ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=tI8Pjfii7XuX3Suiky8mImD7S5BoAq6
>> fgOSdJ7rt2Wo&s=i-ngQFHfxOmgkYx_5NiCaHdIlm7zi2LYpUxm9I3RfR4&e=>  and my
>> personal experience using with Spark with Jackson 2.5 hasn't caused any
>> issues but it does have some useful new features. It should be fully
>> backwards compatible according to the Jackson folks.
>> 
>> On Mon, Feb 1, 2016 at 10:29 AM Ted Yu  wrote:
>>> SPARK-12624 has been resolved.
>>> According to Wenchen, SPARK-12783 is fixed in 1.6.0 release.
>>> 
>>> Are there other blockers for Spark 1.6.1 ?
>>> 
>>> Thanks
>>> 
>>> On Wed, Jan 13, 2016 at 5:39 PM, Michael Armbrust 
>>> wrote:
 Hey All, 
 
 While I'm not aware of any critical issues with 1.6.0, there are several
 corner cases that users are hitting with the Dataset API that are fixed in
 branch-1.6.  As such I'm considering a 1.6.1 release.
 
 At the moment there are only two critical issues targeted for 1.6.1:
  - SPARK-12624 - When schema is specified, we should treat undeclared
 fields as null (in Python)
  - SPARK-12783 - Dataset map serialization error
 
 When these are resolved I'll likely begin the release process.  If there
 are any other issues that we should wait for please contact me.
 
 Michael
>>> 
> 





smime.p7s
Description: S/MIME cryptographic signature


Re: Spark 1.6.1

2016-02-02 Thread Mingyu Kim
Cool, thanks!

Mingyu

From:  Michael Armbrust 
Date:  Tuesday, February 2, 2016 at 10:48 AM
To:  Mingyu Kim 
Cc:  Romi Kuntsman , Hamel Kothari
, Ted Yu ,
"dev@spark.apache.org" , Punya Biswal
, Robert Kruszewski 
Subject:  Re: Spark 1.6.1

I'm waiting for a few last fixes to be merged.  Hoping to cut an RC in the
next few days.

On Tue, Feb 2, 2016 at 10:43 AM, Mingyu Kim  wrote:
> Hi all,
> 
> Is there an estimated timeline for 1.6.1 release? Just wanted to check how the
> release is coming along. Thanks!
> 
> Mingyu
> 
> From: Romi Kuntsman 
> Date: Tuesday, February 2, 2016 at 3:16 AM
> To: Michael Armbrust 
> Cc: Hamel Kothari , Ted Yu ,
> "dev@spark.apache.org" 
> Subject: Re: Spark 1.6.1
> 
> Hi Michael,
> What about the memory leak bug?
> https://issues.apache.org/jira/browse/SPARK-11293
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_b
> rowse_SPARK-2D11293&d=CwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=e
> nnQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=tI8Pjfii7XuX3Suiky8mImD7S5BoAq6fg
> OSdJ7rt2Wo&s=R_B4rDig-0VPE5Q4YeLEs2HUIg-A8St1OtDjD89d_zY&e=>
> Even after the memory rewrite in 1.6.0, it still happens in some cases.
> Will it be fixed for 1.6.1?
> Thanks,
> 
> Romi Kuntsman, Big Data Engineer
> http://www.totango.com
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.totango.com_&d=CwMFaQ
> &c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=ennQJq47pNnObsDh-88a9YUrUulcY
> QoV8giPASqXB84&m=tI8Pjfii7XuX3Suiky8mImD7S5BoAq6fgOSdJ7rt2Wo&s=Z4TgGF0h7oetD4O
> 6u_3qjrYbe0ZtW2g_In7V8tkByPg&e=>
> 
> On Mon, Feb 1, 2016 at 9:59 PM, Michael Armbrust 
> wrote:
>> We typically do not allow changes to the classpath in maintenance releases.
>> 
>> On Mon, Feb 1, 2016 at 8:16 AM, Hamel Kothari  wrote:
>>> I noticed that the Jackson dependency was bumped to 2.5 in master for
>>> something spark-streaming related. Is there any reason that this upgrade
>>> can't be included with 1.6.1?
>>> 
>>> According to later comments on this thread:
>>> https://issues.apache.org/jira/browse/SPARK-8332
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira
>>> _browse_SPARK-2D8332&d=CwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&
>>> r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=tI8Pjfii7XuX3Suiky8mImD7S5Bo
>>> Aq6fgOSdJ7rt2Wo&s=i-ngQFHfxOmgkYx_5NiCaHdIlm7zi2LYpUxm9I3RfR4&e=>  and my
>>> personal experience using with Spark with Jackson 2.5 hasn't caused any
>>> issues but it does have some useful new features. It should be fully
>>> backwards compatible according to the Jackson folks.
>>> 
>>> On Mon, Feb 1, 2016 at 10:29 AM Ted Yu  wrote:
>>>> SPARK-12624 has been resolved.
>>>> According to Wenchen, SPARK-12783 is fixed in 1.6.0 release.
>>>> 
>>>> Are there other blockers for Spark 1.6.1 ?
>>>> 
>>>> Thanks
>>>> 
>>>> On Wed, Jan 13, 2016 at 5:39 PM, Michael Armbrust 
>>>> wrote:
>>>>> Hey All, 
>>>>> 
>>>>> While I'm not aware of any critical issues with 1.6.0, there are several
>>>>> corner cases that users are hitting with the Dataset API that are fixed in
>>>>> branch-1.6.  As such I'm considering a 1.6.1 release.
>>>>> 
>>>>> At the moment there are only two critical issues targeted for 1.6.1:
>>>>>  - SPARK-12624 - When schema is specified, we should treat undeclared
>>>>> fields as null (in Python)
>>>>>  - SPARK-12783 - Dataset map serialization error
>>>>> 
>>>>> When these are resolved I'll likely begin the release process.  If there
>>>>> are any other issues that we should wait for please contact me.
>>>>> 
>>>>> Michael
>>>> 
>> 
> 





smime.p7s
Description: S/MIME cryptographic signature


Utilizing YARN AM RPC port field

2016-06-13 Thread Mingyu Kim
Hi all,

 

YARN provides a way for AppilcationMaster to register a RPC port so that a 
client outside the YARN cluster can reach the application for any RPCs, but 
Spark’s YARN AMs simply register a dummy port number of 0. (See 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala#L74)
 This is useful for the long-running Spark application usecases where jobs are 
submitted via a form of RPC to an already started Spark context running in YARN 
cluster mode. Spark job server 
(https://github.com/spark-jobserver/spark-jobserver) and Livy 
(https://github.com/cloudera/hue/tree/master/apps/spark/java) are good 
open-source examples of these usecases. The current work-around is to have the 
Spark AM make a call back to a configured URL with the port number of the RPC 
server for the client to communicate with the AM.

 

Utilizing YARN AM RPC port allows the port number reporting to be done in a 
secure way (i.e. With AM RPC port field and Kerberized YARN cluster, you don’t 
need to re-invent a way to verify the authenticity of the port number 
reporting.) and removes the callback from YARN cluster back to a client, which 
means you can operate YARN in a low-trust environment and run other client 
applications behind a firewall.

 

A couple of proposals for utilizing YARN AM RPC port I have are, (Note that you 
cannot simply pre-configure the port number and pass it to Spark AM via 
configuration because of potential port conflicts on the YARN node)

 

· Start-up an empty Jetty server during Spark AM initialization, set 
the port number when registering AM with RM, and pass a reference to the Jetty 
server into the Spark application (e.g. through SparkContext) for the 
application to dynamically add servlet/resources to the Jetty server.

· Have an optional static method in the main class (e.g. 
initializeRpcPort()) which optionally sets up a RPC server and returns the RPC 
port. Spark AM can call this method, register the port number to RM and 
continue on with invoking the main method. I don’t see this making a good API, 
though.

 

I’m curious to hear what other people think. Would this be useful for anyone? 
What do you think about the proposals? Please feel free to suggest other ideas. 
Thanks!

 

Mingyu



smime.p7s
Description: S/MIME cryptographic signature


Re: Utilizing YARN AM RPC port field

2016-06-14 Thread Mingyu Kim
Thanks for the pointers, Steve!

 

The first option sounds like a the most light-weight and non-disruptive option 
among them. So, we can add a configuration that enables socket initialization, 
Spark AM will create a ServerSocket if the socket init is enabled and set it on 
SparkContext

 

If there are no objections, I can file a bug and find time to tackle it myself. 

 

Mingyu

 

From: Steve Loughran 
Date: Tuesday, June 14, 2016 at 4:55 AM
To: Mingyu Kim 
Cc: "dev@spark.apache.org" , Matt Cheah 

Subject: Re: Utilizing YARN AM RPC port field

 

 

On 14 Jun 2016, at 01:30, Mingyu Kim  wrote:

 

Hi all,

 

YARN provides a way for AppilcationMaster to register a RPC port so that a 
client outside the YARN cluster can reach the application for any RPCs, but 
Spark’s YARN AMs simply register a dummy port number of 0. (See 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala#L74)
 This is useful for the long-running Spark application usecases where jobs are 
submitted via a form of RPC to an already started Spark context running in YARN 
cluster mode. Spark job server 
(https://github.com/spark-jobserver/spark-jobserver) and Livy 
(https://github.com/cloudera/hue/tree/master/apps/spark/java) are good 
open-source examples of these usecases. The current work-around is to have the 
Spark AM make a call back to a configured URL with the port number of the RPC 
server for the client to communicate with the AM.

 

Utilizing YARN AM RPC port allows the port number reporting to be done in a 
secure way (i.e. With AM RPC port field and Kerberized YARN cluster, you don’t 
need to re-invent a way to verify the authenticity of the port number 
reporting.) and removes the callback from YARN cluster back to a client, which 
means you can operate YARN in a low-trust environment and run other client 
applications behind a firewall.

 

A couple of proposals for utilizing YARN AM RPC port I have are, (Note that you 
cannot simply pre-configure the port number and pass it to Spark AM via 
configuration because of potential port conflicts on the YARN node)

 

· Start-up an empty Jetty server during Spark AM initialization, set 
the port number when registering AM with RM, and pass a reference to the Jetty 
server into the Spark application (e.g. through SparkContext) for the 
application to dynamically add servlet/resources to the Jetty server.

· Have an optional static method in the main class (e.g. 
initializeRpcPort()) which optionally sets up a RPC server and returns the RPC 
port. Spark AM can call this method, register the port number to RM and 
continue on with invoking the main method. I don’t see this making a good API, 
though.

 

I’m curious to hear what other people think. Would this be useful for anyone? 
What do you think about the proposals? Please feel free to suggest other ideas. 
Thanks!

 

 

It's a recurrent irritation of mine that you can't ever change the HTTP/RPC 
ports of a YARN AM after launch; it creates a complex startup state where you 
can't register until your IPC endpoints are up.

 

Tactics

 

-Create a socket on an empty port, register it, hand off the port to the RPC 
setup code as the chosen port. Ideally, support a range to scan, so that 
systems which only open a specific range of ports, e.g. 6500-6800 can have 
those ports only scanned. We've done this in other projects.

 

-serve up the port binding info via a REST API off the AM web; clients hit the 
(HEAD/GET only RM Proxy), ask for the port, work on it. Nonstandard; could be 
extensible with other binding information. (TTL of port caching, )

 

-Use the YARN-913 ZK based registry to register/lookup bindings. This is used 
in various YARN apps to register service endpoints (RPC, Rest); there's work 
ongoing for DNS support. this would allow you to use DNS against a specific DNS 
server to get the endpoints. Works really well with containerized deployments 
where the apps come up with per-container IPaddresses and fixed ports.

Although you couldn't get the latter into the spark-yarn codeitself (needs 
Hadoop 2.6+), you can plug in support via the extension point implemented in 
SPARK-11314., I've actually thought of doing that for a while...just been too 
busy.

 

-Just fix the bit of the YARN api that forces you to know your endpoints in 
advance. People will appreciate it, though it will take a while to trickle 
downstream.

 

 

 

 



smime.p7s
Description: S/MIME cryptographic signature


Re: Utilizing YARN AM RPC port field

2016-06-15 Thread Mingyu Kim
FYI, I just filed https://issues.apache.org/jira/browse/SPARK-15974.

 

Mingyu

 

From: Mingyu Kim 
Date: Tuesday, June 14, 2016 at 2:13 PM
To: Steve Loughran 
Cc: "dev@spark.apache.org" , Matt Cheah 

Subject: Re: Utilizing YARN AM RPC port field

 

Thanks for the pointers, Steve!

 

The first option sounds like a the most light-weight and non-disruptive option 
among them. So, we can add a configuration that enables socket initialization, 
Spark AM will create a ServerSocket if the socket init is enabled and set it on 
SparkContext

 

If there are no objections, I can file a bug and find time to tackle it myself. 

 

Mingyu

 

From: Steve Loughran 
Date: Tuesday, June 14, 2016 at 4:55 AM
To: Mingyu Kim 
Cc: "dev@spark.apache.org" , Matt Cheah 

Subject: Re: Utilizing YARN AM RPC port field

 

 

On 14 Jun 2016, at 01:30, Mingyu Kim  wrote:

 

Hi all,

 

YARN provides a way for AppilcationMaster to register a RPC port so that a 
client outside the YARN cluster can reach the application for any RPCs, but 
Spark’s YARN AMs simply register a dummy port number of 0. (See 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala#L74)
 This is useful for the long-running Spark application usecases where jobs are 
submitted via a form of RPC to an already started Spark context running in YARN 
cluster mode. Spark job server 
(https://github.com/spark-jobserver/spark-jobserver) and Livy 
(https://github.com/cloudera/hue/tree/master/apps/spark/java) are good 
open-source examples of these usecases. The current work-around is to have the 
Spark AM make a call back to a configured URL with the port number of the RPC 
server for the client to communicate with the AM.

 

Utilizing YARN AM RPC port allows the port number reporting to be done in a 
secure way (i.e. With AM RPC port field and Kerberized YARN cluster, you don’t 
need to re-invent a way to verify the authenticity of the port number 
reporting.) and removes the callback from YARN cluster back to a client, which 
means you can operate YARN in a low-trust environment and run other client 
applications behind a firewall.

 

A couple of proposals for utilizing YARN AM RPC port I have are, (Note that you 
cannot simply pre-configure the port number and pass it to Spark AM via 
configuration because of potential port conflicts on the YARN node)

 

· Start-up an empty Jetty server during Spark AM initialization, set 
the port number when registering AM with RM, and pass a reference to the Jetty 
server into the Spark application (e.g. through SparkContext) for the 
application to dynamically add servlet/resources to the Jetty server.

· Have an optional static method in the main class (e.g. 
initializeRpcPort()) which optionally sets up a RPC server and returns the RPC 
port. Spark AM can call this method, register the port number to RM and 
continue on with invoking the main method. I don’t see this making a good API, 
though.

 

I’m curious to hear what other people think. Would this be useful for anyone? 
What do you think about the proposals? Please feel free to suggest other ideas. 
Thanks!

 

 

It's a recurrent irritation of mine that you can't ever change the HTTP/RPC 
ports of a YARN AM after launch; it creates a complex startup state where you 
can't register until your IPC endpoints are up.

 

Tactics

 

-Create a socket on an empty port, register it, hand off the port to the RPC 
setup code as the chosen port. Ideally, support a range to scan, so that 
systems which only open a specific range of ports, e.g. 6500-6800 can have 
those ports only scanned. We've done this in other projects.

 

-serve up the port binding info via a REST API off the AM web; clients hit the 
(HEAD/GET only RM Proxy), ask for the port, work on it. Nonstandard; could be 
extensible with other binding information. (TTL of port caching, )

 

-Use the YARN-913 ZK based registry to register/lookup bindings. This is used 
in various YARN apps to register service endpoints (RPC, Rest); there's work 
ongoing for DNS support. this would allow you to use DNS against a specific DNS 
server to get the endpoints. Works really well with containerized deployments 
where the apps come up with per-container IPaddresses and fixed ports.

Although you couldn't get the latter into the spark-yarn codeitself (needs 
Hadoop 2.6+), you can plug in support via the extension point implemented in 
SPARK-11314., I've actually thought of doing that for a while...just been too 
busy.

 

-Just fix the bit of the YARN api that forces you to know your endpoints in 
advance. People will appreciate it, though it will take a while to trickle 
downstream.

 

 

 

 



smime.p7s
Description: S/MIME cryptographic signature


[SPARK-3050] Spark program running with 1.0.2 jar cannot run against a 1.0.1 cluster

2014-08-14 Thread Mingyu Kim
I ran a really simple code that runs with Spark 1.0.2 jar and connects to a
Spark 1.0.1 cluster, but it fails with java.io.InvalidClassException. I
filed the bug at https://issues.apache.org/jira/browse/SPARK-3050.

I assumed the minor and patch releases shouldn¹t break compatibility. Is
that correct?

Thanks,
Mingyu




smime.p7s
Description: S/MIME cryptographic signature


Re: [SPARK-3050] Spark program running with 1.0.2 jar cannot run against a 1.0.1 cluster

2014-08-15 Thread Mingyu Kim
Thanks for your response. I think I misinterpreted the
stability/compatibility guarantee with 1.0 release. It seems like the
compatibility is only at the API level.

This is interesting because it means any system/product that is built on top
of Spark and uses Spark with a long-running SparkContext connecting to the
cluster over network, will need to make sure it has the exact same version
of Spark jar as the cluster, even to the patch version. This would be
analogous to having to compile Spark against a very specific version of
Hadoop, as opposed to currently being able to use the Spark package with
CDH4 against most of the CDH4 Hadoop clusters.

Is it correct that Spark is focusing and prioritizing around the
spark-submit use cases than the aforementioned use cases? I just wanted to
better understand the future direction/prioritization of spark.

Thanks,
Mingyu

From:  Patrick Wendell 
Date:  Thursday, August 14, 2014 at 6:32 PM
To:  Gary Malouf 
Cc:  Mingyu Kim , "dev@spark.apache.org"

Subject:  Re: [SPARK-3050] Spark program running with 1.0.2 jar cannot run
against a 1.0.1 cluster

I commented on the bug. For driver mode, you'll need to get the
corresponding version of spark-submit for Spark 1.0.2.


On Thu, Aug 14, 2014 at 3:43 PM, Gary Malouf  wrote:
> To be clear, is it 'compiled' against 1.0.2 or it packaged with it?
> 
> 
> On Thu, Aug 14, 2014 at 6:39 PM, Mingyu Kim  wrote:
> 
>> > I ran a really simple code that runs with Spark 1.0.2 jar and connects to
>> > a Spark 1.0.1 cluster, but it fails with java.io.InvalidClassException. I
>> > filed the bug at https://issues.apache.org/jira/browse/SPARK-3050
>> <https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/br
>> owse/SPARK-3050&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=UKDOcu6qL3KsoZhpOohNBR1uc
>> PNmWnbd3eEJ9hVUdMk%3D%0A&m=qvQ59wZwD7EuezjTuLzmNTRUamDRDnI7%2F0%2BnULtXk4k%3D
>> %0A&s=b7abf7638a3e6fac2ddac9d8f0ca52f1a92945465abfb2e2d996a96d2301fec5> .
>> >
>> > I assumed the minor and patch releases shouldn¹t break compatibility. Is
>> > that correct?
>> >
>> > Thanks,
>> > Mingyu
>> >





smime.p7s
Description: S/MIME cryptographic signature


Re: [SPARK-3050] Spark program running with 1.0.2 jar cannot run against a 1.0.1 cluster

2014-08-17 Thread Mingyu Kim
Thanks for the clarification. I don¹t have a deep knowledge of Scala, but I
thought this was going to be reasonable to support since Java serialization
framework provides relatively easy ways to support these kinds of backwards
compatibility. I can see how this could be harder with closures.

Supporting at least the stability between different patch versions would
help a lot.

Mingyu

From:  Patrick Wendell 
Date:  Friday, August 15, 2014 at 12:28 PM
To:  Mingyu Kim 
Cc:  Gary Malouf , "dev@spark.apache.org"

Subject:  Re: [SPARK-3050] Spark program running with 1.0.2 jar cannot run
against a 1.0.1 cluster

Hey Mingyu, 

For this reason we are encouraging all users to run spark-submit. In Spark
we capture closures and send them over the network from the driver to the
executors. These are then deserialized on the executor. So if your driver
program has different versions of certain classes than exist on the
executor, it doesn't work well. We've even run into stranger issues, where
the exact same version of Spark was used at the driver and the executor, but
they were compiled at different times. Since Scala doesn't guarantee stable
naming for certain types of anonymous classes, the class names didn't match
up and it caused errors at runtime.

The most straightforward way to deal with this is to inject, at run-time,
the exact version of Spark that the cluster expects if you are running the
standalone mode.

I think we'd be totally open to improving this to provide "API stability"
for the case you are working with, i.e. the case where you have spark 1.0.X
at the driver and 1.0.Y on the executors. But it will require looking at
what exactly causes incompatibility and seeing if there is a solution. In
this case I think we changed a publicly exposed class (the RDD class) in
some way that caused compatibility issues... even though we didn't change
any binary signatures.

BTW - this is not relevant to YARN mode where you ship Spark with your job
so there is no "cluster version of Spark".

- Patrick


On Fri, Aug 15, 2014 at 11:13 AM, Mingyu Kim  wrote:
> Thanks for your response. I think I misinterpreted the stability/compatibility
> guarantee with 1.0 release. It seems like the compatibility is only at the API
> level.
> 
> This is interesting because it means any system/product that is built on top
> of Spark and uses Spark with a long-running SparkContext connecting to the
> cluster over network, will need to make sure it has the exact same version of
> Spark jar as the cluster, even to the patch version. This would be analogous
> to having to compile Spark against a very specific version of Hadoop, as
> opposed to currently being able to use the Spark package with CDH4 against
> most of the CDH4 Hadoop clusters.
> 
> Is it correct that Spark is focusing and prioritizing around the spark-submit
> use cases than the aforementioned use cases? I just wanted to better
> understand the future direction/prioritization of spark.
> 
> Thanks,
> Mingyu
> 
> From: Patrick Wendell 
> Date: Thursday, August 14, 2014 at 6:32 PM
> To: Gary Malouf 
> Cc: Mingyu Kim , "dev@spark.apache.org"
> 
> Subject: Re: [SPARK-3050] Spark program running with 1.0.2 jar cannot run
> against a 1.0.1 cluster
> 
> I commented on the bug. For driver mode, you'll need to get the corresponding
> version of spark-submit for Spark 1.0.2.
> 
> 
> On Thu, Aug 14, 2014 at 3:43 PM, Gary Malouf  wrote:
>> To be clear, is it 'compiled' against 1.0.2 or it packaged with it?
>> 
>> 
>> On Thu, Aug 14, 2014 at 6:39 PM, Mingyu Kim  wrote:
>> 
>>> > I ran a really simple code that runs with Spark 1.0.2 jar and connects to
>>> > a Spark 1.0.1 cluster, but it fails with java.io.InvalidClassException. I
>>> > filed the bug at https://issues.apache.org/jira/browse/SPARK-3050
>>> <https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/b
>>> rowse/SPARK-3050&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=UKDOcu6qL3KsoZhpOohNBR1
>>> ucPNmWnbd3eEJ9hVUdMk%3D%0A&m=qvQ59wZwD7EuezjTuLzmNTRUamDRDnI7%2F0%2BnULtXk4k
>>> %3D%0A&s=b7abf7638a3e6fac2ddac9d8f0ca52f1a92945465abfb2e2d996a96d2301fec5> .
>>> >
>>> > I assumed the minor and patch releases shouldn¹t break compatibility. Is
>>> > that correct?
>>> >
>>> > Thanks,
>>> > Mingyu
>>> >
> 





smime.p7s
Description: S/MIME cryptographic signature