[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-03 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/7185#issuecomment-118369856
  
Still has serialization bug in the code, will fix it soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7050][build] Keep kafka-assembly maven ...

2015-07-05 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/5632#issuecomment-118720543
  
Hi @srowen , if the assembly jar is not found using this pattern 
`target/scala-*/spark-streaming-kafka-assembly-*.jar`, the python Kafka test 
will not be run at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7050][build] Keep kafka-assembly maven ...

2015-07-06 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/5632#issuecomment-118769448
  
Yes, Python streaming Kafka related tests will not be run if this jar is 
missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-06 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7185#discussion_r33999257
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -670,4 +670,17 @@ private class KafkaUtilsPythonHelper {
 TopicAndPartition(topic, partition)
 
   def createBroker(host: String, port: JInt): Broker = Broker(host, port)
+
+  def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
--- End diff --

Hi @tdas , I think I already put this method into `KafkaUtilsPythonHelper`, 
not sure what you specified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-06 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7185#discussion_r33999784
  
--- Diff: python/pyspark/streaming/dstream.py ---
@@ -602,13 +602,17 @@ class TransformedDStream(DStream):
 Multiple continuous transformations of DStream can be combined into
 one transformation.
 """
-def __init__(self, prev, func):
+def __init__(self, prev, func, transformFunc=None):
--- End diff --

Previously I use override to do it, but have several problem with 
`TransformFunction`/`TransformDStream` deserialization problem. I guess it is 
the derived class which makes deserialization error, so instead of using 
`KafkaTransformFunction`/`KafkaTransformDStream`, I added one another field to 
workaround this problem. I admitted this implementation is weird, do you have 
any better suggestion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-06 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7185#discussion_r33999855
  
--- Diff: python/pyspark/streaming/kafka.py ---
@@ -200,14 +203,30 @@ def __init__(self, topic, partition, fromOffset, 
untilOffset):
 :param fromOffset: Inclusive starting offset.
 :param untilOffset: Exclusive ending offset.
 """
-self._topic = topic
-self._partition = partition
-self._fromOffset = fromOffset
-self._untilOffset = untilOffset
+self.topic = topic
+self.partition = partition
+self.fromOffset = fromOffset
+self.untilOffset = untilOffset
+
+def __eq__(self, other):
--- End diff --

I think it is easy to compare the equality of two objects, especially for 
codes in unit tests, otherwise we have to compare two `OffsetRange` filed by 
field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-06 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7185#discussion_r34000251
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -670,4 +670,17 @@ private class KafkaUtilsPythonHelper {
 TopicAndPartition(topic, partition)
 
   def createBroker(host: String, port: JInt): Broker = Broker(host, port)
+
+  def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
+val parentRDDs = rdd.getNarrowAncestors
+val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, 
_, _, _, _]])
--- End diff --

Personally I'd incline to use filter with `require(...)` as mentioned by TD 
below to guard against some unexpected situations (more than one KafkaRDDs in 
the lineage chain), since the lineage should not be long (2 or 3 dependencies), 
so this is not a big problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-06 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7185#discussion_r34001375
  
--- Diff: python/pyspark/streaming/kafka.py ---
@@ -200,14 +203,30 @@ def __init__(self, topic, partition, fromOffset, 
untilOffset):
 :param fromOffset: Inclusive starting offset.
 :param untilOffset: Exclusive ending offset.
 """
-self._topic = topic
-self._partition = partition
-self._fromOffset = fromOffset
-self._untilOffset = untilOffset
+self.topic = topic
+self.partition = partition
+self.fromOffset = fromOffset
+self.untilOffset = untilOffset
+
+def __eq__(self, other):
--- End diff --

Also the behavior is the same as Java counterpart.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-06 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7185#discussion_r34003101
  
--- Diff: python/pyspark/streaming/dstream.py ---
@@ -602,13 +602,17 @@ class TransformedDStream(DStream):
 Multiple continuous transformations of DStream can be combined into
 one transformation.
 """
-def __init__(self, prev, func):
+def __init__(self, prev, func, transformFunc=None):
--- End diff --

Hi @tdas , my previous way like this (subclass way):

```python
class TransformedDStream(DStream):

.

def transfromFunc(self, ctx, func, *deserializers):
return TransformFunction(ctx, func, deserializers)
```

```python

class KafkaTransformedDStream(TransformedDStream):

def __init__(self, prev, func):
TransformedDStream.__init__(self, prev, func)

def transfromFunc(self, ctx, func, *deserializers):
return TransformFunction(ctx, func, deserializers) \
.rdd_wrapper(lambda jrdd, ctx, ser: KafkaRDD(jrdd, ctx, ser))
```

this will lead to such exception:

```
Traceback (most recent call last):
  File "/mnt/data/project/apache-spark/python/pyspark/streaming/util.py", 
line 62, in call
r = self.func(t, *rdds)
  File "/mnt/data/project/apache-spark/python/pyspark/streaming/tests.py", 
line 99, in get_output
r = rdd.collect()
  File "/mnt/data/project/apache-spark/python/pyspark/rdd.py", line 758, in 
collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File 
"/mnt/data/project/apache-spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 538, in __call__
self.target_id, self.name)
  File 
"/mnt/data/project/apache-spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
 line 300, in get_return_value
format(target_id, '.', name), value)
Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 44.0 failed 1 times, most recent failure: Lost task 0.0 in stage 44.0 
(TID 44, localhost): org.apache.spark.api.python.PythonException: Traceback 
(most recent call last):
  File 
"/mnt/data/project/apache-spark/python/lib/pyspark.zip/pyspark/worker.py", line 
111, in main
process()
  File 
"/mnt/data/project/apache-spark/python/lib/pyspark.zip/pyspark/worker.py", line 
105, in process
iterator = deserializer.load_stream(infile)
AttributeError: 'tuple' object has no attribute 'load_stream'

at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```

I guess this is due to serialization issue, so I workaround this as what I 
did currently.

I will do some other tests to see if there's any other way.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-06 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7185#discussion_r34004968
  
--- Diff: python/pyspark/streaming/kafka.py ---
@@ -244,3 +263,87 @@ def __init__(self, host, port):
 
 def _jBroker(self, helper):
 return helper.createBroker(self._host, self._port)
+
+
+class KafkaRDD(RDD):
+"""
+A Python wrapper of KafkaRDD, to provide additional information on 
normal RDD.
+"""
+
+def __init__(self, jrdd, ctx, jrdd_deserializer):
+RDD.__init__(self, jrdd, ctx, jrdd_deserializer)
+
+def offsetRanges(self):
+"""
+Get the OffsetRange of specific KafkaRDD.
+:return: A list of OffsetRange
+"""
+try:
+helperClass = 
self.ctx._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+helper = helperClass.newInstance()
+joffsetRanges = helper.offsetRangesOfKafkaRDD(self._jrdd.rdd())
+except Py4JJavaError as e:
+if 'ClassNotFoundException' in str(e.java_exception):
+KafkaUtils._printErrorMsg(self.ctx)
+raise e
+
+ranges = [OffsetRange(o.topic(), o.partition(), o.fromOffset(), 
o.untilOffset())
+  for o in joffsetRanges]
+return ranges
+
+
+class KafkaDStream(DStream):
+"""
+A Python wrapper of KafkaDStream
+"""
+
+def __init__(self, jdstream, ssc, jrdd_deserializer):
+DStream.__init__(self, jdstream, ssc, jrdd_deserializer)
+
+def foreachRDD(self, func):
+"""
+Apply a function to each RDD in this DStream.
+"""
+if func.__code__.co_argcount == 1:
+old_func = func
+func = lambda r, rdd: old_func(rdd)
+jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) 
\
+.rdd_wrapper(lambda jrdd, ctx, ser: KafkaRDD(jrdd, ctx, ser))
+api = self._ssc._jvm.PythonDStream
+api.callForeachRDD(self._jdstream, jfunc)
+
+def transform(self, func):
+"""
+Return a new DStream in which each RDD is generated by applying a 
function
+on each RDD of this DStream.
+
+`func` can have one argument of `rdd`, or have two arguments of
+(`time`, `rdd`)
+"""
+if func.__code__.co_argcount == 1:
+oldfunc = func
+func = lambda t, rdd: oldfunc(rdd)
+assert func.__code__.co_argcount == 2, "func should take one or 
two arguments"
+
+return KafkaTransformedDStream(self, func)
+
+
+class KafkaTransformedDStream(TransformedDStream):
--- End diff --

Hi @tdas , here is another way using override to get Kafka specific 
`TransformedDStream` and overcome deserialization problem. Please help to 
review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8743] [Streaming]: De-registering Codah...

2015-07-06 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7250#discussion_r34009444
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -577,6 +575,12 @@ class StreamingContext private[streaming] (
* @throws IllegalStateException if the StreamingContext is already 
stopped.
*/
   def start(): Unit = synchronized {
+  /**
+   * Registering Streaming Metrics at the start of the StreamingContext
--- End diff --

I think it is better to use `//` rather than `/** */` for internal code 
commenting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8743] [Streaming]: De-registering Codah...

2015-07-06 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/7250#issuecomment-119081600
  
This change LGTM, actually will get rid of some annoying logs :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-07 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/7185#issuecomment-119385419
  
@amit-ramesh , the problem you mentioned is different from here what we 
addressed. Basically I think you required such functionality like 
`messageHandler` in Scala/Java API to get offset in the executor side. 
Currently we don't have clear solution to support this in Python API, since 
Spark Streaming Kafka API on Python is slightly different in implementation 
compared to Scala/Java, so it is not easy to handle the problem you mentioned 
unless we change the current semantics.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-07 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/7185#issuecomment-119386124
  
Even using `DStream.transform()` you could only get the offsetRanges in the 
driver side, please check the implementation of this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-07 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/7185#issuecomment-119447855
  
Hi @amit-ramesh , what I mentioned about getting offsetRanges in 
`transform` function is something like this:

```python
dstream.transform(lambda r: r.offsetRanges())
```

Here `r.offsetRanges` is executed in driver side, if you have follow-up 
transformations inside `transfrom` function which need to use this 
offsetRanges, this offsetRanges will implicitly be sent to executor side. 
That's what I mean about.

Also:
>1. Events in an RDD partition are ordered by Kafka offset
>2. The index of an OffsetRanges object in the getOffsets() list 
corresponds to the partition index in the RDD.

This two assumptions are true as I know. so you could rely on this 
assumptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5523][Core][Streaming] Add a cache for ...

2015-07-08 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/5064#issuecomment-119508679
  
Hi @tdas and @andrewor14 , I tested a lot on the memory consumption of 
`TaskMetrics` and related `_hostname` string.

Here I use `DirectKafkaWordCount` as test workload with task number to 1 as 
a minimal setting, also 1 master + 1 slave with standalone mode.

According to my profiling with driver processor using JProfiler, the 
instance number of `TaskMetrics` is at least around 2000 (with full GC 
triggered), you could refer to this chart:

![image](https://cloud.githubusercontent.com/assets/850797/8566588/bc80d1bc-2591-11e5-9f0d-f50c5ce5405a.png)

so if we linearly increase the task number, say to 1000 (for a middle scale 
cluster), we will get at least 1000 * 2000 (2M) `TaskMetrics` objects, also 2M 
`_hostname` objects in the previous code, if each `_hostname` occupies 64 
bytes, so totally 128M memory will be occupied for `_hostname` objects, this is 
proportional to the task number and `TaskMetrics`.

And for now in my implementation, the memory occupation of `_hostname` is 
proportional to the cluster size (no relation to the task number, numbers of 
`TaskMetrics`), say if we have 1000 nodes in cluster, the total memory 
occupation of `_hostname` will be 1000 * 64 Bytes with one additional hashmap.

So actually this change does reduce the memory consumption (though not so 
many), it is more evident in the long-running and large scale cases. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5523][Core][Streaming] Add a cache for ...

2015-07-08 Thread jerryshao
GitHub user jerryshao reopened a pull request:

https://github.com/apache/spark/pull/5064

[SPARK-5523][Core][Streaming] Add a cache for hostname in TaskMetrics to 
decrease the memory usage and GC overhead

Hostname in TaskMetrics will be created through deserialization, mostly the 
number of hostname is only the order of number of cluster node, so adding a 
cache layer to dedup the object could reduce the memory usage and alleviate GC 
overhead, especially for long-running and fast job generation applications like 
Spark Streaming.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark SPARK-5523

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/5064.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5064


commit 7bc3834970a0edafa87813aa83af954118c19f4e
Author: Saisai Shao 
Date:   2015-03-17T06:41:26Z

Add a pool to cache the hostname

commit e4de2b4c31ad37ab7bb88fd3df1ea1bab53b02f5
Author: jerryshao 
Date:   2015-03-18T05:59:40Z

Address the comments




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8743] [Streaming]: De-registering Codah...

2015-07-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7250#discussion_r34214800
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
---
@@ -73,7 +73,7 @@ private[spark] class MetricsSystem private (
   private[this] val metricsConfig = new MetricsConfig(conf)
 
   private val sinks = new mutable.ArrayBuffer[Sink]
-  private val sources = new mutable.ArrayBuffer[Source]
+  val sources = new mutable.ArrayBuffer[Source]
--- End diff --

I think it would be better not to loose the restriction to expose this to 
the public user just only for test convenience. you could refer 
[here](https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala#L782)
 to see how to test with private field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8743] [Streaming]: De-registering Codah...

2015-07-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7250#discussion_r34215156
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -688,6 +690,8 @@ class StreamingContext private[streaming] (
 } finally {
   // The state should always be Stopped after calling `stop()`, even 
if we haven't started yet
   state = STOPPED
+  // De-registering Streaming Metrics of the StreamingContext
+  env.metricsSystem.removeSource(streamingSource)
--- End diff --

I think it would be better to add try as TD said, otherwise you may meet 
exception when SparkContext is stopped.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4122][STREAMING] Add a library that can...

2015-07-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/2994#discussion_r34216141
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+
+import scala.reflect.ClassTag
+
+import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.DStream
+
+/**
+ * Import this object in this form:
+ * {{{
+ *   import org.apache.spark.streaming.kafka.KafkaWriter._
+ * }}}
+ *
+ * Once imported, the `writeToKafka` can be called on any [[DStream]] 
object in this form:
+ * {{{
+ *   dstream.writeToKafka(producerConfig, f)
+ * }}}
+ */
+object KafkaWriter {
+  import scala.language.implicitConversions
+  /**
+   * This implicit method allows the user to call dstream.writeToKafka(..)
+   * @param dstream - DStream to write to Kafka
+   * @tparam T - The type of the DStream
+   * @tparam K - The type of the key to serialize to
+   * @tparam V - The type of the value to serialize to
+   * @return
+   */
+  implicit def createKafkaOutputWriter[T: ClassTag, K, V](dstream: 
DStream[T]): KafkaWriter[T] = {
+new KafkaWriter[T](dstream)
+  }
+}
+
+/**
+ *
+ * This class can be used to write data to Kafka from Spark Streaming. To 
write data to Kafka
+ * simply `import org.apache.spark.streaming.kafka.KafkaWriter._` in your 
application and call
+ * `dstream.writeToKafka(producerConf, func)`
+ *
+ * Here is an example:
+ * {{{
+ * // Adding this line allows the user to call 
dstream.writeDStreamToKafka(..)
+ * import org.apache.spark.streaming.kafka.KafkaWriter._
+ *
+ * class ExampleWriter {
+ *   val instream = ssc.queueStream(toBe)
+ *   val producerConf = new Properties()
+ *   producerConf.put("serializer.class", 
"kafka.serializer.DefaultEncoder")
+ *   producerConf.put("key.serializer.class", 
"kafka.serializer.StringEncoder")
+ *   producerConf.put("metadata.broker.list", "kafka.example.com:5545")
+ *   producerConf.put("request.required.acks", "1")
+ *   instream.writeToKafka(producerConf,
+ *(x: String) => new KeyedMessage[String, String]("default", null, x))
+ *   ssc.start()
+ * }
+ *
+ * }}}
+ * @param dstream - The [[DStream]] to be written to Kafka
+ *
+ */
+class KafkaWriter[T: ClassTag](@transient dstream: DStream[T]) extends 
Serializable with Logging {
+
+  /**
+   * To write data from a DStream to Kafka, call this function after 
creating the DStream. Once
+   * the DStream is passed into this function, all data coming from the 
DStream is written out to
+   * Kafka. The properties instance takes the configuration required to 
connect to the Kafka
+   * brokers in the standard Kafka format. The serializerFunc is a 
function that converts each
+   * element of the RDD to a Kafka [[KeyedMessage]]. This closure should 
be serializable - so it
+   * should use only instances of Serializables.
+   * @param producerConfig The configuration that can be used to connect 
to Kafka
+   * @param serializerFunc The function to convert the data from the 
stream into Kafka
+   *   [[KeyedMessage]]s.
+   * @tparam K The type of the key
+   * @tparam V The type of the value
+   *
+   */
+  def writeToKafka[K, V](producerConfig: Properties,
+serializerFunc: T => KeyedMessage[K, V]): Unit = {
+
+// Broadcast the producer to avoid sending it every time.
+val broadcastedConfig = dstream.ssc.sc.broadcast(producerConfig)
+
+def func = (rdd: RDD[T]) => {

[GitHub] spark pull request: [SPARK-5523][Core][Streaming] Add a cache for ...

2015-07-08 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/5064#issuecomment-119777643
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8743] [Streaming]: De-registering Codah...

2015-07-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7250#discussion_r34217232
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -674,6 +676,8 @@ class StreamingContext private[streaming] (
   logWarning("StreamingContext has already been stopped")
 case ACTIVE =>
   scheduler.stop(stopGracefully)
+  // De-registering Streaming Metrics of the StreamingContext
+  env.metricsSystem.removeSource(streamingSource)
--- End diff --

I think here remove the source in `ACTIVE` state may introduce some corner 
case problem. For example, since metrics source is added when 
StreamingContext's state is `INITIALIZED`, if we met exception at this point, 
we will never change the state into `ACTIVE`, so metrics source cannot be 
removed, since you only assume the state is `ACTIVE` to remove the source. What 
do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8743] [Streaming]: De-registering Codah...

2015-07-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7250#discussion_r34218984
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -674,6 +676,8 @@ class StreamingContext private[streaming] (
   logWarning("StreamingContext has already been stopped")
 case ACTIVE =>
   scheduler.stop(stopGracefully)
+  // De-registering Streaming Metrics of the StreamingContext
+  env.metricsSystem.removeSource(streamingSource)
--- End diff --

What I mean is to move this line out of `ACTIVE` condition, In any case 
have to deregister the metrics source, not only in `ACTIVE` state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34219133
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager(
 def isExecutorIdle(executorId: String): Boolean = {
   !executorIdToTaskIds.contains(executorId)
 }
+
+/**
+ * Get the number of locality aware pending tasks and related locality 
preferences as the
+ * hints used for executor allocation.
+ */
+def executorPlacementHints(): (Int, Map[String, Int]) =
+  allocationManager.synchronized {
+  var localityAwarePendingTasks: Int = 0
+  val localityToCount = new mutable.HashMap[String, Int]()
+  stageIdToPreferredLocations.values.foreach { localities =>
--- End diff --

Hi @sryza , that's a good question. Reason why I didn't differentiate 
pending, running and completed tasks is that what we get here is a list to 
locality preferences `Seq[Seq[String]]`, where we don't have task id mapping 
information (we cannot get the exact task id at that point in DAGScheduler), so 
it is hard to differentiate the status with such fine-grained way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34219475
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -872,6 +872,25 @@ class DAGScheduler(
 // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
 // event.
 stage.latestInfo = StageInfo.fromStage(stage, 
Some(partitionsToCompute.size))
+val taskIdToLocations = try {
+  stage match {
+case s: ShuffleMapStage =>
+  partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, 
id))}.toMap
+case s: ResultStage =>
+  val job = s.resultOfJob.get
+  partitionsToCompute.map { id =>
+val p: Int = job.partitions(id)
+(id, getPreferredLocs(stage.rdd, p))
+  }.toMap
+  }
+} catch {
+  case NonFatal(e) =>
--- End diff --

Because `getPreferredLocs()` may throw exception, please refer to 
(https://github.com/apache/spark/commit/0b5abbf5f96a5f6bfd15a65e8788cf3fa96fe54c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34220281
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.SparkFunSuite
+
+class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers 
with BeforeAndAfterEach {
+
+  private val yarnAllocatorSuite = new YarnAllocatorSuite
--- End diff --

Some codes like `createAllocator()` is actually use the method of 
`YarnAllocatorSuite`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8743] [Streaming]: De-registering Codah...

2015-07-09 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7250#discussion_r34237335
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -674,6 +676,8 @@ class StreamingContext private[streaming] (
   logWarning("StreamingContext has already been stopped")
 case ACTIVE =>
   scheduler.stop(stopGracefully)
+  // De-registering Streaming Metrics of the StreamingContext
+  env.metricsSystem.removeSource(streamingSource)
--- End diff --

I think it is OK for normal case, but what if an exception is met after 
metrics is successfully registered, but before changing the state into 
`ACTIVE`, according to the code, it will try the exception and change state 
into `STOPPED`, so at that situation, if we call stop(), we will never 
de-register the metrics  source according to the current implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8389][Streaming][PySpark] Expose KafkaR...

2015-07-09 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7185#discussion_r34319973
  
--- Diff: python/pyspark/streaming/util.py ---
@@ -37,6 +37,11 @@ def __init__(self, ctx, func, *deserializers):
 self.ctx = ctx
 self.func = func
 self.deserializers = deserializers
+self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
+
+def rdd_wrapper(self, func):
--- End diff --

Hi @tdas , I haven't yet fixed this, I'll change this later on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6051][Streaming] Add ZooKeeper offest p...

2015-07-09 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/4805#issuecomment-120210548
  
OK, get it, that makes sense. I'm closing it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6051][Streaming] Add ZooKeeper offest p...

2015-07-09 Thread jerryshao
Github user jerryshao closed the pull request at:

https://github.com/apache/spark/pull/4805


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8337][Streaming][Pyspark] Add MessageHa...

2015-07-14 Thread jerryshao
GitHub user jerryshao opened a pull request:

https://github.com/apache/spark/pull/7410

[SPARK-8337][Streaming][Pyspark] Add MessageHandler for Kafka Python API

Propose a way to add messageHandler for Kafka Python API, offer user a 
similar pattern to use message like Scala/Java:

```python

def getOffset(m):
return m and m.offset
  
KafkaUtils.createDirectStream(ssc, topic, kafkaParams, 
messageHandler=getOffset)
```

Internally serialize the Kafka `MessageAndMetadata` to Python using Pickler.

Please help to review, thanks a lot.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark SPARK-8337

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7410.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7410


commit 798b047c9fd7c5fe474aa9d21745a43dbd3316e0
Author: jerryshao 
Date:   2015-07-15T02:48:06Z

Add MessageHandler for Kafka Python API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6304][Streaming] Fix checkpointing does...

2015-07-14 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/5060#issuecomment-121510441
  
Yeah, that sounds good :), let me update the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6304][Streaming] Fix checkpointing does...

2015-07-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/5060#discussion_r34661816
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -191,6 +191,34 @@ class CheckpointSuite extends TestSuiteBase {
 }
   }
 
+  // This tests if "spark.driver.host" and "spark.driver.port" is set by 
user, can be recovered
+  // with correct value.
+  test("correctly recover spark.driver.[host|port] from checkpoint") {
+val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" 
-> "")
+conf.foreach(kv => System.setProperty(kv._1, kv._2))
+ssc = new StreamingContext(master, framework, batchDuration)
+val originalConf = ssc.conf
+assert(originalConf.get("spark.driver.host") === "localhost")
+assert(originalConf.get("spark.driver.port") === "")
+
+val cp = new Checkpoint(ssc, Time(1000))
+ssc.stop()
+
+// Serialize/deserialize to simulate write to storage and reading it 
back
+val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
+
+val newCpConf = newCp.createSparkConf()
+assert(newCpConf.contains("spark.driver.host"))
+assert(newCpConf.contains("spark.driver.port"))
+assert(newCpConf.get("spark.driver.host") === "localhost")
--- End diff --

OK, I will update the test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34756031
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -872,6 +872,25 @@ class DAGScheduler(
 // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
 // event.
 stage.latestInfo = StageInfo.fromStage(stage, 
Some(partitionsToCompute.size))
+val taskIdToLocations = try {
+  stage match {
+case s: ShuffleMapStage =>
+  partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, 
id))}.toMap
+case s: ResultStage =>
+  val job = s.resultOfJob.get
+  partitionsToCompute.map { id =>
+val p = job.partitions(id)
+(id, getPreferredLocs(stage.rdd, p))
+  }.toMap
+  }
+} catch {
+  case NonFatal(e) =>
+abortStage(stage, s"Task creation failed: 
$e\n${e.getStackTraceString}")
+runningStages -= stage
+return
+}
+stage.latestInfo.taskLocalityPreferences = 
Some(taskIdToLocations.values.toSeq)
--- End diff --

Thanks @kayousterhout , that's a good idea, I will change the code 
accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34757199
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -872,6 +872,25 @@ class DAGScheduler(
 // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
 // event.
 stage.latestInfo = StageInfo.fromStage(stage, 
Some(partitionsToCompute.size))
+val taskIdToLocations = try {
+  stage match {
+case s: ShuffleMapStage =>
+  partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, 
id))}.toMap
+case s: ResultStage =>
+  val job = s.resultOfJob.get
+  partitionsToCompute.map { id =>
+val p = job.partitions(id)
+(id, getPreferredLocs(stage.rdd, p))
+  }.toMap
+  }
+} catch {
+  case NonFatal(e) =>
+abortStage(stage, s"Task creation failed: 
$e\n${e.getStackTraceString}")
+runningStages -= stage
+return
+}
+stage.latestInfo.taskLocalityPreferences = 
Some(taskIdToLocations.values.toSeq)
--- End diff --

I think if we move this field `taskLocalityPreferences` to `StageInfo`, 
though we don't require `Option` and can be more consistent to other code, the 
big concern is that we need to change lots of places in the unit test where 
uses `StageInfo`, I'm not sure is it worthwhile to change it?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34757572
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -24,11 +24,15 @@ package org.apache.spark
 private[spark] trait ExecutorAllocationClient {
 
   /**
-   * Express a preference to the cluster manager for a given total number 
of executors.
+   * Express a preference to the cluster manager for a given total number 
of executors,
+   * number of locality aware pending tasks and related locality 
preferences.
* This can result in canceling pending requests or filing additional 
requests.
* @return whether the request is acknowledged by the cluster manager.
*/
-  private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
+  private[spark] def requestTotalExecutors(
+  numExecutors: Int,
+  localityAwarePendingTasks: Int,
+  preferredLocalityToCount: Map[String, Int]): Boolean
--- End diff --

Actually I think the key string is hostname, not executor :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34758587
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -872,6 +872,25 @@ class DAGScheduler(
 // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
 // event.
 stage.latestInfo = StageInfo.fromStage(stage, 
Some(partitionsToCompute.size))
+val taskIdToLocations = try {
+  stage match {
+case s: ShuffleMapStage =>
+  partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, 
id))}.toMap
+case s: ResultStage =>
+  val job = s.resultOfJob.get
+  partitionsToCompute.map { id =>
+val p = job.partitions(id)
+(id, getPreferredLocs(stage.rdd, p))
+  }.toMap
+  }
+} catch {
+  case NonFatal(e) =>
+abortStage(stage, s"Task creation failed: 
$e\n${e.getStackTraceString}")
+runningStages -= stage
+return
+}
+stage.latestInfo.taskLocalityPreferences = 
Some(taskIdToLocations.values.toSeq)
--- End diff --

Yeah, this is a way, I will change it, another concern is about Mima test, 
since this is a public class, I will test it locally, thanks for your advises 
:).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9091][STREAMING]Add the CompressionCode...

2015-07-16 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7442#discussion_r34855902
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---
@@ -20,6 +20,8 @@ package org.apache.spark.streaming.dstream
 
 import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 
+import org.apache.hadoop.io.compress.CompressionCodec
--- End diff --

This third-party import should be after scala imports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-16 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/6394#issuecomment-122175891
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-9059 Update Direct Kafka Word count exam...

2015-07-19 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7467#discussion_r34964892
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
 ---
@@ -106,6 +111,32 @@ public Integer call(Integer i1, Integer i2) {
   });
 wordCounts.print();
 
+//Access the offset ranges using HasOffsetRanges
+// Hold a reference to the current offset ranges, so it can be used 
downstream
+final AtomicReference offsetRanges = new 
AtomicReference();
+messages.transformToPair(
+new Function, JavaPairRDD>() {
+@Override
+public JavaPairRDD call(JavaPairRDD rdd) throws Exception {
+OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+offsetRanges.set(offsets);
+return rdd;
+}
+}
+).foreachRDD(
--- End diff --

I think we could get and print out the offsetRanges immediately in 
`foreachRDD`, not sure why we need another `transformToPair` to only get the 
offsetRanges?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9065][Streaming][PySpark] Add MessageHa...

2015-07-22 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/7410#issuecomment-123960642
  
Yeah, will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-22 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35290784
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumTasks(stageId) = numTasks
 allocationManager.onSchedulerBacklogged()
+
+var numTasksPending = 0
--- End diff --

there may has chance localities is empty, so using 
taskLocalityPreferences.size() to calculating the number of locality preferred 
pending task is not accurate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-22 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35292409
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumTasks(stageId) = numTasks
 allocationManager.onSchedulerBacklogged()
+
+var numTasksPending = 0
--- End diff --

what if a `hadoopRDD` union a `parallelizedRDD` and then do shuffle, I 
think in this situation a unionRDD with some parts have localities


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35295955
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -225,8 +245,11 @@ private[yarn] class YarnAllocator(
   logInfo(s"Will request $missing executor containers, each with 
${resource.getVirtualCores} " +
 s"cores and ${resource.getMemory} MB memory including 
$memoryOverhead MB overhead")
 
-  for (i <- 0 until missing) {
-val request = createContainerRequest(resource)
+  val containerLocalityPreferences = 
containerPlacementStrategy.localityOfRequestedContainers(
--- End diff --

Yes, I will add a TODO to explain the problem here and fix it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35296348
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   * numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  hostToLocalTaskCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
--- End diff --

I think "matching localities" means the existed container's localities that 
matches to the expected localities. Here means if all the existed container's 
localities cannot match the expected localities.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35296419
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   * numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  hostToLocalTaskCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
--- End diff --

I will change a better word.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-23 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/6394#issuecomment-124291143
  
Hi @sryza and @kayousterhout , thanks a lot for your review, greatly 
appreciate it. I've fixed most part of comments you addressed, please help to 
review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9065][Streaming][PySpark] Add MessageHa...

2015-07-23 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/7410#issuecomment-124328012
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-24 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35401677
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -28,7 +28,10 @@ private[spark] trait ExecutorAllocationClient {
* This can result in canceling pending requests or filing additional 
requests.
* @return whether the request is acknowledged by the cluster manager.
*/
-  private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
+  private[spark] def requestTotalExecutors(
+  numExecutors: Int,
--- End diff --

Seems I miss this comment, will update the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-27 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/6394#issuecomment-125396961
  
@sryza and @kayousterhout thanks a lot for your review, greatly appreciate 
it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9065][Streaming][PySpark] Add MessageHa...

2015-07-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7410#discussion_r35609468
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -684,3 +679,53 @@ private class KafkaUtilsPythonHelper {
 kafkaRDD.offsetRanges.toSeq
   }
 }
+
+private object KafkaUtilsPythonHelper {
+  private var initialized = false
+
+  def initialize(): Unit = {
--- End diff --

Would you please share more details on it, I cannot exactly get what you 
mean. What I did here is to follow the pattern here in 
MLlib(https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala#L1370)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9104][CORE][WIP] expose Netty network l...

2015-07-29 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7753#discussion_r35831987
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked during the execution of an executor.
+ *
+ * So, when adding new fields, take into consideration that the whole 
object can be serialized for
+ * shipping off at any time to consumers of the SparkListener interface.
+ */
+@DeveloperApi
+class ExecutorMetrics extends Serializable {
+
+  type SysTime = Long
+
+  /**
+   * Host's name the executor runs on
+   */
+  private var _hostname: String = _
+  def hostname: String = _hostname
+  private[spark] def setHostname(value: String) = _hostname = value
+
+  /**
+   * Host's port the executor runs on
+   */
+  private var _port: Int = _
+  def port: Int = _port
+  private[spark] def setPort(value: Int) = _port = value
+
+  def hostPort: String = hostname + ":" + port
+
+  /**
+   * maximum on-heap memory that the executor used for shuffle read on 
Netty network layer
+   */
+  @volatile private var _maxNettyReadOnheapSizeTime: (Long, SysTime) = 
(0L, 0L)
+  def maxNettyReadOnheapSizeTime: (Long, SysTime) = 
_maxNettyReadOnheapSizeTime
--- End diff --

not `maxNettyOnHeapSizeTime`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9104][CORE][WIP] expose Netty network l...

2015-07-29 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7753#discussion_r35832065
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked during the execution of an executor.
+ *
+ * So, when adding new fields, take into consideration that the whole 
object can be serialized for
+ * shipping off at any time to consumers of the SparkListener interface.
+ */
+@DeveloperApi
+class ExecutorMetrics extends Serializable {
+
+  type SysTime = Long
+
+  /**
+   * Host's name the executor runs on
+   */
+  private var _hostname: String = _
+  def hostname: String = _hostname
+  private[spark] def setHostname(value: String) = _hostname = value
+
+  /**
+   * Host's port the executor runs on
+   */
+  private var _port: Int = _
+  def port: Int = _port
+  private[spark] def setPort(value: Int) = _port = value
+
+  def hostPort: String = hostname + ":" + port
+
+  /**
+   * maximum on-heap memory that the executor used for shuffle read on 
Netty network layer
+   */
+  @volatile private var _maxNettyReadOnheapSizeTime: (Long, SysTime) = 
(0L, 0L)
+  def maxNettyReadOnheapSizeTime: (Long, SysTime) = 
_maxNettyReadOnheapSizeTime
--- End diff --

I think we'd better create a named case class like:

```
case class NettyMemoryMetric(value: Long, currentTime: Long)
```

To replace this returned Tuple. I think it will be more clear to understand.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9104][CORE][WIP] expose Netty network l...

2015-07-29 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7753#discussion_r35832213
  
--- Diff: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 ---
@@ -47,6 +53,27 @@ class NettyBlockTransferService(conf: SparkConf, 
securityManager: SecurityManage
   private[this] var server: TransportServer = _
   private[this] var clientFactory: TransportClientFactory = _
   private[this] var appId: String = _
+  
+  private[this] var clock: Clock = new SystemClock()
--- End diff --

I think you could add a new method here like in 
[ExecutorAllocationManager](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L209)
 to let user to set ManualClock, this will be easy for test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9104][CORE][WIP] expose Netty network l...

2015-07-29 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/7753#discussion_r35832542
  
--- Diff: 
core/src/main/scala/org/apache/spark/network/BlockTransferService.scala ---
@@ -38,6 +39,11 @@ abstract class BlockTransferService extends 
ShuffleClient with Closeable with Lo
   def init(blockDataManager: BlockDataManager)
 
   /**
+   * Collect current executor memory metrics of transferService.
+   */
+  def getMemMetrics(executorMetrics: ExecutorMetrics): Unit = {}
--- End diff --

Would it be better to change to a virtual function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9104][CORE][WIP] expose Netty network l...

2015-07-29 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/7753#issuecomment-126160280
  
A simple question, is it enough to only expose the maximum memory usage of 
Netty layer?

Besides, IMHO I think it would be better to separate getting Netty memory 
usage and displaying in web portal into two different PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9104][CORE][WIP] expose Netty network l...

2015-07-29 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/7753#issuecomment-126163708
  
@squito mind helping to review this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9468][Yarn][Core] Avoid scheduling task...

2015-07-30 Thread jerryshao
GitHub user jerryshao opened a pull request:

https://github.com/apache/spark/pull/7786

[SPARK-9468][Yarn][Core] Avoid scheduling tasks on preemption executors

For Spark running on Yarn, if preemption is enabled by Yarn, containers 
will be preempted by Yarn, this will be notified to AM ahead of time. Spark 
itself should be aware of this preemption information and avoid scheduling 
tasks on these executors.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark SPARK-9468

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7786.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7786


commit 6d030ec8a9b1e616c2c7c17697de7c5f703bd2af
Author: jerryshao 
Date:   2015-07-30T05:56:17Z

Add preemption mechanism for Spark on Yarn




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9104][CORE][WIP] expose Netty network l...

2015-07-30 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/7753#issuecomment-126534206
  
Hi @squito , IIUC here the code always set the max memory usage:

```
 @volatile private var _nettyReadOnheapSize: Long = _
  def nettyReadOnheapSize: Long = _nettyReadOnheapSize
  private[spark] def setNettyReadOnheapSize(value: Long, time: SysTime): 
Unit = {
_nettyReadOnheapSize = value
if (value > _maxNettyReadOnheapSizeTime._1) {
  _maxNettyReadOnheapSizeTime = (value, time)
}
  }
```

If current memory usage is smaller than the previous one, simple will 
discard it.

From my understanding it is better to transfer the raw metrics to the 
driver, and further processed by each listener as they wanted, that will be 
more flexible. If people want the minimum memory usage for example, they could 
process it by their logic in listener. Here only get the max memory usage will 
restrict the flexibility for different user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11270] [STREAMING] Add improved equalit...

2015-10-22 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/9236#issuecomment-150396719
  
LGTM, the test is in `pyspark/streaming/tests.py`, you can refer to it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10582] If a new AM restarts, the total ...

2015-10-22 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8737#issuecomment-150399528
  
@andrewor14 , are we still planning to address this issue? Seem it is 
actually a problem here with dynamic allocation enabled..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11272][Core] Support importing and expo...

2015-10-22 Thread jerryshao
GitHub user jerryshao opened a pull request:

https://github.com/apache/spark/pull/9238

[SPARK-11272][Core] Support importing and exporting event logs from 
HistoryServer web portal

This patch helps user to easily download the event logs from HistoryServer, 
or upload other's log to debug and replay. Quite useful for user to seek help 
or identify problem for others. Here is the screenshot:

![screen shot 2015-10-23 at 11 47 40 
am](https://cloud.githubusercontent.com/assets/850797/1068/bdb9fd64-797f-11e5-9e16-315d10164af1.png)


Please review. Thanks a lot.
  

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark SPARK-11272

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9238.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9238


commit f92393fbf1c031499b09903fb86a184f196d78b6
Author: jerryshao 
Date:   2015-10-23T03:59:10Z

Support importing and exporting event logs from HistoryServer web portal




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11272][Core][UI] Support importing and ...

2015-10-25 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/9238#issuecomment-150995617
  
Updated UI:


![image](https://cloud.githubusercontent.com/assets/850797/10719277/d31423c2-7bc0-11e5-9e07-f82e5e8bd73d.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9817][YARN] Improve the locality calcul...

2015-10-25 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8100#issuecomment-150996047
  
Thanks @vanzin , will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9817][YARN] Improve the locality calcul...

2015-10-25 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8100#issuecomment-151013429
  
Hi @vanzin , this patch improves the container locality calculating 
algorithm by considering the locality of pending requests.

1. If the locality of current pending requests cannot match the locality 
needs of pending tasks, we will cancel these pending requests.
2. If the locality of current pending request can match the locality needs, 
treating them as valid requests in locality calculation, avoid sending 
unnecessary requests again.
3. Locality unmatched requests will be canceled and recalculated again with 
locality preferences of pending tasks, to get the optimal locality distribution.

Note this is a best effort algorithm based on the status of Yarn and Spark 
at that time, since the status will be changed, it cannot guarantee the global 
optimal result.

Please help to review, thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11272][Core][UI] Support importing and ...

2015-10-25 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/9238#issuecomment-151014208
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10582] If a new AM restarts, the total ...

2015-10-26 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8737#issuecomment-151046589
  
Yes, the problem still exists, @KaiXinXiaoLei are you still working on this 
issue to address the comments mentioned above?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2960][Deploy] Support executing Spark f...

2015-10-26 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8669#issuecomment-151363719
  
Hi @srowen @patrungel @JoshRosen and @vazin, I changed this patch to honor 
`SPARK_HOME` to solve symlink issue, if users want to run the executables from 
symlink, they should set `SPARK_HOME` manually. This instead removes lots of 
duplications, still has the ability to run on symlinkd, please help to review, 
any comment is greatly appreciated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2960][Deploy] Support executing Spark f...

2015-10-26 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8669#issuecomment-151377956
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11334] numRunningTasks can't be less th...

2015-10-26 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/9288#issuecomment-151385860
  
IMHO, would it be better to fix this unexpected ordering of events, from my 
understanding, `SparkListenerTaskEnd` should be triggered before 
`SparkListenerStageCompleted `, right? I think that's root cause should be 
fixed, rather than adding some guard codes in dynamic allocation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-10-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r43088193
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] and
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]].
+ * {{{
+ *
+ * }}}
+ */
+sealed abstract class State[S] {
+  
+  /** Whether the state already exists */
+  def exists(): Boolean
+  
+  /**
+   * Get the state if it exists, otherwise wise it will throw an exception.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value. Note that you cannot update the 
state if the state is
+   * timing out (that is, `isTimingOut() return true`, or if the state has 
already been removed by
+   * `remove()`.
+   */
+  def update(newState: S): Unit
+
+  /** Remove the state if it exists. */
+  def remove(): Unit
+
+  /** Is the state going to be timed out by the system after this batch 
interval */
+  def isTimingOut(): Boolean
+
+  @inline final def getOption(): Option[S] = Option(get())
+
+  /** Get the state if it exists, otherwise return the default value */
+  @inline final def getOrElse[S1 >: S](default: => S1): S1 = {
--- End diff --

Not sure is this "call-by-name" parameter Java friendly? Assuming this 
`State` should also be used in Java code :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-10-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r43088697
  
--- Diff: streaming/src/main/scala/org/apache/spark/streaming/State.scala 
---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Abstract class for getting and updating the tracked state in the 
`trackStateByKey` operation of
+ * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair 
DStream]] and
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]].
+ * {{{
+ *
+ * }}}
+ */
+sealed abstract class State[S] {
+  
+  /** Whether the state already exists */
+  def exists(): Boolean
+  
+  /**
+   * Get the state if it exists, otherwise wise it will throw an exception.
+   * Check with `exists()` whether the state exists or not before calling 
`get()`.
+   */
+  def get(): S
+
+  /**
+   * Update the state with a new value. Note that you cannot update the 
state if the state is
+   * timing out (that is, `isTimingOut() return true`, or if the state has 
already been removed by
+   * `remove()`.
+   */
+  def update(newState: S): Unit
+
+  /** Remove the state if it exists. */
+  def remove(): Unit
+
+  /** Is the state going to be timed out by the system after this batch 
interval */
+  def isTimingOut(): Boolean
+
+  @inline final def getOption(): Option[S] = Option(get())
+
+  /** Get the state if it exists, otherwise return the default value */
+  @inline final def getOrElse[S1 >: S](default: => S1): S1 = {
+if (exists) this.get else default
+  }
+
+  @inline final override def toString() = getOption.map { _.toString 
}.getOrElse("")
+}
+
+/** Internal implementation of the [[State]] interface */
+private[streaming] class StateImpl[S] extends State[S] {
+
+  private var state: S = null.asInstanceOf[S]
+  private var defined: Boolean = true
+  private var timingOut: Boolean = false
+  private var updated: Boolean = false
+  private var removed: Boolean = false
+
+  // = Public API =
+  def exists(): Boolean = {
+defined
+  }
+
+  def get(): S = {
+state
+  }
+
+  def update(newState: S): Unit = {
+require(!removed, "Cannot update the state after it has been removed")
+require(!timingOut, "Cannot update the state that is timing out")
+state = newState
--- End diff --

Is this required for defensive guard `require(!updated, "cannot update the 
state this is already updated")`? 




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2960][Deploy] Support executing Spark f...

2015-10-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/8669#discussion_r43088776
  
--- Diff: bin/beeline ---
@@ -23,8 +23,10 @@
 # Enter posix mode for bash
 set -o posix
 
-# Figure out where Spark is installed
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
+# Figure out if SPARK_HOME is set
+if [ -z "${SPARK_HOME}" ]; then
+export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
--- End diff --

Thanks @srowen for your comments, I will change to the 2-space indent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-10-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r43089007
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/TrackStateSpec.scala ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{HashPartitioner, Partitioner}
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Abstract class having all the specifications of 
DStream.trackStateByKey().
+ * Use the `TrackStateSpec.create()` or `TrackStateSpec.create()` to 
create instances of this class.
+ *
+ * {{{
+ *TrackStateSpec(trackingFunction)// in Scala
+ *TrackStateSpec.create(trackingFunction) // in Java
+ * }}}
+ */
+sealed abstract class TrackStateSpec[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag]
+  extends Serializable {
+
+  def initialState(rdd: RDD[(K, S)]): this.type
+  def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type
+
+  def numPartitions(numPartitions: Int): this.type
+  def partitioner(partitioner: Partitioner): this.type
+
+  def timeout(interval: Duration): this.type
+}
+
+
+/** Builder object for creating instances of TrackStateSpec */
+object TrackStateSpec {
+
+  def apply[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
+  trackingFunction: (K, Option[V], State[S]) => Option[T]): 
TrackStateSpec[K, V, S, T] = {
+new TrackStateSpecImpl[K, V, S, T](trackingFunction)
+  }
+
+  def create[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
+  trackingFunction: (K, Option[V], State[S]) => Option[T]): 
TrackStateSpec[K, V, S, T] = {
+apply(trackingFunction)
+  }
--- End diff --

I think here Java friendly constructor is necessary, `create` might not be 
directly used in Java code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r43089727
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackeStateDStream.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import java.io.{IOException, ObjectOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.util.StateMap
+import org.apache.spark.util.Utils
+
+private[streaming] case class TrackStateRDDRecord[K: ClassTag, S: 
ClassTag, T: ClassTag](
+stateMap: StateMap[K, S], emittedRecords: Seq[T])
+
+
+private[streaming] class TrackStateRDDPartition(
+idx: Int,
+@transient private var prevStateRDD: RDD[_],
+@transient private var partitionedDataRDD: RDD[_]) extends Partition {
+
+  private[dstream] var previousSessionRDDPartition: Partition = null
+  private[dstream] var partitionedDataRDDPartition: Partition = null
+
+  override def index: Int = idx
+  override def hashCode(): Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
+// Update the reference to parent split at the time of task 
serialization
+previousSessionRDDPartition = prevStateRDD.partitions(index)
+partitionedDataRDDPartition = partitionedDataRDD.partitions(index)
+oos.defaultWriteObject()
+  }
+}
+
+private[streaming] class TrackStateRDD[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+_sc: SparkContext,
+private var prevStateRDD: RDD[TrackStateRDDRecord[K, S, T]],
+private var partitionedDataRDD: RDD[(K, V)],
+trackingFunction: (K, Option[V], State[S]) => Option[T],
+currentTime: Long, timeoutThresholdTime: Option[Long]
+  ) extends RDD[TrackStateRDDRecord[K, S, T]](
+_sc,
+List(
+  new OneToOneDependency[TrackStateRDDRecord[K, S, T]](prevStateRDD),
+  new OneToOneDependency(partitionedDataRDD))
+  ) {
+
+  @volatile private var doFullScan = false
+
+  require(partitionedDataRDD.partitioner.nonEmpty)
+  require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)
+
+  override val partitioner = prevStateRDD.partitioner
+
+  override def checkpoint(): Unit = {
+super.checkpoint()
+doFullScan = true
+  }
+
+  override def compute(
+  partition: Partition, context: TaskContext): 
Iterator[TrackStateRDDRecord[K, S, T]] = {
+
+val stateRDDPartition = partition.asInstanceOf[TrackStateRDDPartition]
+val prevStateRDDIterator = prevStateRDD.iterator(
+  stateRDDPartition.previousSessionRDDPartition, context)
+val dataIterator = partitionedDataRDD.iterator(
+  stateRDDPartition.partitionedDataRDDPartition, context)
+if (!prevStateRDDIterator.hasNext) {
+  throw new SparkException(s"Could not find state map in previous 
state RDD")
+}
+
+val newStateMap = prevStateRDDIterator.next().stateMap.copy()
+val emittedRecords = new ArrayBuffer[T]
+
+val wrappedState = new StateImpl[S]()
+
+dataIterator.foreach { case (key, value) =>
+  wrappedState.wrap(newStateMap.get(key))
+  val emittedRecord = trackingFunction(key, Some(value), wrappedState)
--- End diff --

Is it possible `value` will be `null`, would it be better to use 
`Option(value)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
c

[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r43090719
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackeStateDStream.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import java.io.{IOException, ObjectOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.util.StateMap
+import org.apache.spark.util.Utils
+
+private[streaming] case class TrackStateRDDRecord[K: ClassTag, S: 
ClassTag, T: ClassTag](
+stateMap: StateMap[K, S], emittedRecords: Seq[T])
+
+
+private[streaming] class TrackStateRDDPartition(
+idx: Int,
+@transient private var prevStateRDD: RDD[_],
+@transient private var partitionedDataRDD: RDD[_]) extends Partition {
+
+  private[dstream] var previousSessionRDDPartition: Partition = null
+  private[dstream] var partitionedDataRDDPartition: Partition = null
+
+  override def index: Int = idx
+  override def hashCode(): Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
+// Update the reference to parent split at the time of task 
serialization
+previousSessionRDDPartition = prevStateRDD.partitions(index)
+partitionedDataRDDPartition = partitionedDataRDD.partitions(index)
+oos.defaultWriteObject()
+  }
+}
+
+private[streaming] class TrackStateRDD[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+_sc: SparkContext,
+private var prevStateRDD: RDD[TrackStateRDDRecord[K, S, T]],
+private var partitionedDataRDD: RDD[(K, V)],
+trackingFunction: (K, Option[V], State[S]) => Option[T],
+currentTime: Long, timeoutThresholdTime: Option[Long]
+  ) extends RDD[TrackStateRDDRecord[K, S, T]](
+_sc,
+List(
+  new OneToOneDependency[TrackStateRDDRecord[K, S, T]](prevStateRDD),
+  new OneToOneDependency(partitionedDataRDD))
+  ) {
+
+  @volatile private var doFullScan = false
+
+  require(partitionedDataRDD.partitioner.nonEmpty)
+  require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)
+
+  override val partitioner = prevStateRDD.partitioner
+
+  override def checkpoint(): Unit = {
+super.checkpoint()
+doFullScan = true
+  }
+
+  override def compute(
+  partition: Partition, context: TaskContext): 
Iterator[TrackStateRDDRecord[K, S, T]] = {
+
+val stateRDDPartition = partition.asInstanceOf[TrackStateRDDPartition]
+val prevStateRDDIterator = prevStateRDD.iterator(
+  stateRDDPartition.previousSessionRDDPartition, context)
+val dataIterator = partitionedDataRDD.iterator(
+  stateRDDPartition.partitionedDataRDDPartition, context)
+if (!prevStateRDDIterator.hasNext) {
+  throw new SparkException(s"Could not find state map in previous 
state RDD")
+}
+
+val newStateMap = prevStateRDDIterator.next().stateMap.copy()
+val emittedRecords = new ArrayBuffer[T]
+
+val wrappedState = new StateImpl[S]()
+
+dataIterator.foreach { case (key, value) =>
+  wrappedState.wrap(newStateMap.get(key))
+  val emittedRecord = trackingFunction(key, Some(value), wrappedState)
+  if (wrappedState.isRemoved) {
+newStateMap.remove(key)
+  } else if (wrappedState.isUpdated) {
+newStateMap.put(key, wrappedState.get(), currentTime)
+  }
+  emittedRecords ++= emittedRecord
--- End diff --

It is better not to materialize all the `emittedRecords` in `compute()

[GitHub] spark pull request: [SPARK-11334] numRunningTasks can't be less th...

2015-10-27 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/9288#issuecomment-151404693
  
Can we do this by adding pending to kill tasks into list, only when all the 
tasks marked as finished, then call `markStageAsFinished`, also post 
`SparkListenerJobEnd`. AFAIK, this is the way to manage executor killing in 
`CoarseGrainedSchedulerBacked`, from the code level (not sure the complexity) 
this can also be achieved here in `DAGScheduler`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2960][Deploy] Support executing Spark f...

2015-10-27 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8669#issuecomment-151490838
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2960][Deploy] Support executing Spark f...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/8669#discussion_r43208324
  
--- Diff: bin/run-example ---
@@ -44,7 +46,7 @@ JAR_COUNT=0
 
 for f in "${JAR_PATH}"/spark-examples-*hadoop*.jar; do
   if [[ ! -e "$f" ]]; then
-echo "Failed to find Spark examples assembly in $FWDIR/lib or 
$FWDIR/examples/target" 1>&2
+echo "Failed to find Spark examples assembly in ${SPARK_HOME}/lib or 
$SPARK_HOME/examples/target" 1>&2
--- End diff --

Thanks @patrungel , I will change it. BTW, what do you think of this 
solution, compared to the previous code using `readlink`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9182#discussion_r43219135
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+
+import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * An extension service that can be loaded into a Spark YARN scheduler.
+ * A Service that can be started and stopped
+ *
+ * The `stop()` operation MUST be idempotent, and succeed even if 
`start()` was
+ * never invoked.
+ */
+trait SchedulerExtensionService {
+
+  /**
+   * Start the extension service. This should be a no-op if
+   * called more than once.
+   * @param binding binding to the spark application and YARN
+   */
+  def start(binding: SchedulerExtensionServiceBinding): Unit
+
+  /**
+   * Stop the service
+   * The `stop()` operation MUST be idempotent, and succeed even if 
`start()` was
+   * never invoked.
+   */
+  def stop(): Unit
+}
+
+/**
+ * Binding information for a [[SchedulerExtensionService]]
+ * @param sparkContext current spark context
+ * @param applicationId YARN application ID
+ * @param attemptId optional AttemptID.
+ */
+case class SchedulerExtensionServiceBinding(
+sparkContext: SparkContext,
+applicationId: ApplicationId,
+attemptId: Option[ApplicationAttemptId] = None)
+
+/**
+ * Container for [[SchedulerExtensionService]] instances.
+ *
+ * Loads Extension Services from the configuration property
+ * `"spark.yarn.services"`, instantiates and starts them.
+ * When stopped, it stops all child entries.
+ *
+ * The order in which child extension services are started and stopped
+ * is undefined.
+ *
+ */
+private[spark] class SchedulerExtensionServices extends 
SchedulerExtensionService
+with Logging {
+  private var services: List[SchedulerExtensionService] = Nil
+  private var sparkContext: SparkContext = _
+  private var appId: ApplicationId = _
+  private var attemptId: Option[ApplicationAttemptId] = _
+  private val started = new AtomicBoolean(false)
+  private var binding: SchedulerExtensionServiceBinding = _
+
+  /**
+   * Binding operation will load the named services and call bind on them 
too; the
+   * entire set of services are then ready for `init()` and `start()` calls
+
+   * @param binding binding to the spark application and YARN
+   */
+  def start(binding: SchedulerExtensionServiceBinding): Unit = {
+if (started.getAndSet(true)) {
+  logWarning("Ignoring re-entrant start operation")
+  return
+}
+require(binding.sparkContext != null, "Null context parameter")
+require(binding.applicationId != null, "Null appId parameter")
+this.binding = binding
+sparkContext = binding.sparkContext
+appId = binding.applicationId
+attemptId = binding.attemptId
+logInfo(s"Starting Yarn extension services with app 
${binding.applicationId}" +
+s" and attemptId $attemptId")
--- End diff --

nit: 2 spaces indent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9182#discussion_r43219282
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+
+import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * An extension service that can be loaded into a Spark YARN scheduler.
+ * A Service that can be started and stopped
+ *
+ * The `stop()` operation MUST be idempotent, and succeed even if 
`start()` was
+ * never invoked.
+ */
+trait SchedulerExtensionService {
+
+  /**
+   * Start the extension service. This should be a no-op if
+   * called more than once.
+   * @param binding binding to the spark application and YARN
+   */
+  def start(binding: SchedulerExtensionServiceBinding): Unit
+
+  /**
+   * Stop the service
+   * The `stop()` operation MUST be idempotent, and succeed even if 
`start()` was
+   * never invoked.
+   */
+  def stop(): Unit
+}
+
+/**
+ * Binding information for a [[SchedulerExtensionService]]
+ * @param sparkContext current spark context
+ * @param applicationId YARN application ID
+ * @param attemptId optional AttemptID.
+ */
+case class SchedulerExtensionServiceBinding(
+sparkContext: SparkContext,
+applicationId: ApplicationId,
+attemptId: Option[ApplicationAttemptId] = None)
+
+/**
+ * Container for [[SchedulerExtensionService]] instances.
+ *
+ * Loads Extension Services from the configuration property
+ * `"spark.yarn.services"`, instantiates and starts them.
+ * When stopped, it stops all child entries.
+ *
+ * The order in which child extension services are started and stopped
+ * is undefined.
+ *
+ */
+private[spark] class SchedulerExtensionServices extends 
SchedulerExtensionService
+with Logging {
+  private var services: List[SchedulerExtensionService] = Nil
+  private var sparkContext: SparkContext = _
+  private var appId: ApplicationId = _
+  private var attemptId: Option[ApplicationAttemptId] = _
+  private val started = new AtomicBoolean(false)
+  private var binding: SchedulerExtensionServiceBinding = _
+
+  /**
+   * Binding operation will load the named services and call bind on them 
too; the
+   * entire set of services are then ready for `init()` and `start()` calls
+
+   * @param binding binding to the spark application and YARN
+   */
+  def start(binding: SchedulerExtensionServiceBinding): Unit = {
+if (started.getAndSet(true)) {
+  logWarning("Ignoring re-entrant start operation")
+  return
+}
+require(binding.sparkContext != null, "Null context parameter")
+require(binding.applicationId != null, "Null appId parameter")
+this.binding = binding
+sparkContext = binding.sparkContext
+appId = binding.applicationId
+attemptId = binding.attemptId
+logInfo(s"Starting Yarn extension services with app 
${binding.applicationId}" +
+s" and attemptId $attemptId")
+
+services = 
sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
+.map { s =>
+  s.split(",").map(_.trim()).filter(!_.isEmpty)
+.map { sClass =>
+val instance = Utils.classForName(sClass)
+.newInstance()
+.asInstanceOf[SchedulerExtensionService]
+// bind this service
+instance.start(binding)
+logInfo(s"Servic

[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9182#discussion_r43219322
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+
+import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * An extension service that can be loaded into a Spark YARN scheduler.
+ * A Service that can be started and stopped
+ *
+ * The `stop()` operation MUST be idempotent, and succeed even if 
`start()` was
+ * never invoked.
+ */
+trait SchedulerExtensionService {
+
+  /**
+   * Start the extension service. This should be a no-op if
+   * called more than once.
+   * @param binding binding to the spark application and YARN
+   */
+  def start(binding: SchedulerExtensionServiceBinding): Unit
+
+  /**
+   * Stop the service
+   * The `stop()` operation MUST be idempotent, and succeed even if 
`start()` was
+   * never invoked.
+   */
+  def stop(): Unit
+}
+
+/**
+ * Binding information for a [[SchedulerExtensionService]]
+ * @param sparkContext current spark context
+ * @param applicationId YARN application ID
+ * @param attemptId optional AttemptID.
+ */
+case class SchedulerExtensionServiceBinding(
+sparkContext: SparkContext,
+applicationId: ApplicationId,
+attemptId: Option[ApplicationAttemptId] = None)
+
+/**
+ * Container for [[SchedulerExtensionService]] instances.
+ *
+ * Loads Extension Services from the configuration property
+ * `"spark.yarn.services"`, instantiates and starts them.
+ * When stopped, it stops all child entries.
+ *
+ * The order in which child extension services are started and stopped
+ * is undefined.
+ *
+ */
+private[spark] class SchedulerExtensionServices extends 
SchedulerExtensionService
+with Logging {
+  private var services: List[SchedulerExtensionService] = Nil
+  private var sparkContext: SparkContext = _
+  private var appId: ApplicationId = _
+  private var attemptId: Option[ApplicationAttemptId] = _
+  private val started = new AtomicBoolean(false)
+  private var binding: SchedulerExtensionServiceBinding = _
+
+  /**
+   * Binding operation will load the named services and call bind on them 
too; the
+   * entire set of services are then ready for `init()` and `start()` calls
+
+   * @param binding binding to the spark application and YARN
+   */
+  def start(binding: SchedulerExtensionServiceBinding): Unit = {
+if (started.getAndSet(true)) {
+  logWarning("Ignoring re-entrant start operation")
+  return
+}
+require(binding.sparkContext != null, "Null context parameter")
+require(binding.applicationId != null, "Null appId parameter")
+this.binding = binding
+sparkContext = binding.sparkContext
+appId = binding.applicationId
+attemptId = binding.attemptId
+logInfo(s"Starting Yarn extension services with app 
${binding.applicationId}" +
+s" and attemptId $attemptId")
+
+services = 
sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
+.map { s =>
+  s.split(",").map(_.trim()).filter(!_.isEmpty)
+.map { sClass =>
+val instance = Utils.classForName(sClass)
+.newInstance()
--- End diff --

Do we need to try catch some exceptions like `ClassNotFound` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If

[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9182#discussion_r43219421
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
+import org.apache.spark.deploy.yarn.ApplicationMaster
--- End diff --

nit: this import can be merged with above one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9182#discussion_r43219575
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 ---
@@ -51,6 +51,38 @@ private[spark] abstract class YarnSchedulerBackend(
 
   private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
 
+  /** Application ID. Must be set by a subclass before starting the 
service */
+  private var appId: ApplicationId = null
+
+  /** Attempt ID. This is unset for client-side schedulers */
+  private var attemptId: Option[ApplicationAttemptId] = None
+
+  /** Scheduler extension services */
+  private val services: SchedulerExtensionServices = new 
SchedulerExtensionServices()
+
+  /**
+* Bind to YARN. This *must* be done before calling [[start()]].
+*
+* @param appId YARN application ID
+* @param attemptId Optional YARN attempt ID
+*/
+  protected def bindToYARN(appId: ApplicationId, attemptId: 
Option[ApplicationAttemptId]): Unit = {
--- End diff --

`bindToYarn`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9182#discussion_r43219638
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+
--- End diff --

nit: one more empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9182#discussion_r43219717
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
 ---
@@ -0,0 +1,50 @@
+/*
--- End diff --

Can we put these fake stub class into one file like: `SparkYarnTestHelper` 
or something else? That will possibly reduce the file number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9182#discussion_r43220093
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+
+import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * An extension service that can be loaded into a Spark YARN scheduler.
+ * A Service that can be started and stopped
+ *
+ * The `stop()` operation MUST be idempotent, and succeed even if 
`start()` was
+ * never invoked.
+ */
+trait SchedulerExtensionService {
+
+  /**
+   * Start the extension service. This should be a no-op if
+   * called more than once.
+   * @param binding binding to the spark application and YARN
+   */
+  def start(binding: SchedulerExtensionServiceBinding): Unit
+
+  /**
+   * Stop the service
+   * The `stop()` operation MUST be idempotent, and succeed even if 
`start()` was
+   * never invoked.
+   */
+  def stop(): Unit
+}
+
+/**
+ * Binding information for a [[SchedulerExtensionService]]
+ * @param sparkContext current spark context
+ * @param applicationId YARN application ID
+ * @param attemptId optional AttemptID.
+ */
+case class SchedulerExtensionServiceBinding(
+sparkContext: SparkContext,
+applicationId: ApplicationId,
+attemptId: Option[ApplicationAttemptId] = None)
+
+/**
+ * Container for [[SchedulerExtensionService]] instances.
+ *
+ * Loads Extension Services from the configuration property
+ * `"spark.yarn.services"`, instantiates and starts them.
+ * When stopped, it stops all child entries.
+ *
+ * The order in which child extension services are started and stopped
+ * is undefined.
+ *
+ */
+private[spark] class SchedulerExtensionServices extends 
SchedulerExtensionService
+with Logging {
+  private var services: List[SchedulerExtensionService] = Nil
+  private var sparkContext: SparkContext = _
+  private var appId: ApplicationId = _
+  private var attemptId: Option[ApplicationAttemptId] = _
+  private val started = new AtomicBoolean(false)
+  private var binding: SchedulerExtensionServiceBinding = _
+
+  /**
+   * Binding operation will load the named services and call bind on them 
too; the
+   * entire set of services are then ready for `init()` and `start()` calls
+
+   * @param binding binding to the spark application and YARN
+   */
+  def start(binding: SchedulerExtensionServiceBinding): Unit = {
+if (started.getAndSet(true)) {
+  logWarning("Ignoring re-entrant start operation")
+  return
+}
+require(binding.sparkContext != null, "Null context parameter")
+require(binding.applicationId != null, "Null appId parameter")
+this.binding = binding
--- End diff --

Here `binding` is actually duplicated with below 3 parameters, from my 
understanding in this code, we could choose either.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11315] [YARN] WiP Add YARN extension se...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r43220805
  
--- Diff: yarn/pom.xml ---
@@ -164,6 +164,113 @@
  
   
 
+  
+
--- End diff --

Does this profile work under SBT?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11315] [YARN] WiP Add YARN extension se...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r43221207
  
--- Diff: 
yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.io.InterruptedIOException
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{SchedulerExtensionService, 
SchedulerExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered YARN 
Timeline Server.
+ *
+ * Posting algorithm
+ *
+ * 1. The service subscribes to all events coming from the Spark Context.
+ * 1. These events are serialized into JSON objects for publishing to the 
timeline service through
+ * HTTP(S) posts.
+ * 1. Events are buffered into `pendingEvents` until a batch is aggregated 
into a
+ * [[TimelineEntity]] for posting.
+ * 1. That aggregation happens when a lifecycle event (application 
start/stop) takes place,
+ * or the number of pending events in a running application exceeds the 
limit set in
+ * `spark.hadoop.yarn.timeline.batch.size`.
+ * 1. Posting operations take place in a separate thread from the spark 
event listener.
+ * 1. If an attempt to post to the timeline server fails, the service 
sleeps and then
+ * it is re-attempted after the retry period defined by
+ * `spark.hadoop.yarn.timeline.post.retry.interval`.
+ * 1. If the number of events buffered in the history service exceed the 
limit set in
+ * `spark.hadoop.yarn.timeline.post.limit`, then further events other than 
application start/stop
+ * are dropped.
+ * 1. When the service is stopped, it will make a best-effort attempt to 
post all queued events.
+ * the call of [[stop()]] can block up to the duration of
+ * `spark.hadoop.yarn.timeline.shutdown.waittime` for this to take place.
+ * 1. No events are posted until the service receives a 
[[SparkListenerApplicationStart]] event.
+ *
+ * If the spark context has a metrics registry, then the internal counters 
of queued entities,
+ * post failures and successes, and the performance of the posting 
operation are all registered
+ * as metrics.
+ *
+ * The shutdown logic is somewhat convoluted, as the posting thread may be 
blocked on HTTP IO
+ * when the shutdown process begins. In this situation, the thread 
continues to be blocked, and
+ * will be interrupted once the wait time has expired. All time consumed 
during the ongoing
+ * operation will be counted as part of the shutdown time period.
+ */
+private[spark] class YarnHistoryService extends SchedulerExtensionService
+  with Logging with Source {
--- End diff --

Can we make `Source` related things into a sub-class or separated class. 
Here in this class there's so many class parameters, it is not easy to 
understand and track the state.


---
If your project is set up for it, you can reply to this email and have your
reply

[GitHub] spark pull request: [SPARK-11315] [YARN] WiP Add YARN extension se...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r43221438
  
--- Diff: 
yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.io.InterruptedIOException
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{SchedulerExtensionService, 
SchedulerExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered YARN 
Timeline Server.
+ *
+ * Posting algorithm
+ *
+ * 1. The service subscribes to all events coming from the Spark Context.
+ * 1. These events are serialized into JSON objects for publishing to the 
timeline service through
+ * HTTP(S) posts.
+ * 1. Events are buffered into `pendingEvents` until a batch is aggregated 
into a
+ * [[TimelineEntity]] for posting.
+ * 1. That aggregation happens when a lifecycle event (application 
start/stop) takes place,
+ * or the number of pending events in a running application exceeds the 
limit set in
+ * `spark.hadoop.yarn.timeline.batch.size`.
+ * 1. Posting operations take place in a separate thread from the spark 
event listener.
+ * 1. If an attempt to post to the timeline server fails, the service 
sleeps and then
+ * it is re-attempted after the retry period defined by
+ * `spark.hadoop.yarn.timeline.post.retry.interval`.
+ * 1. If the number of events buffered in the history service exceed the 
limit set in
+ * `spark.hadoop.yarn.timeline.post.limit`, then further events other than 
application start/stop
+ * are dropped.
+ * 1. When the service is stopped, it will make a best-effort attempt to 
post all queued events.
+ * the call of [[stop()]] can block up to the duration of
+ * `spark.hadoop.yarn.timeline.shutdown.waittime` for this to take place.
+ * 1. No events are posted until the service receives a 
[[SparkListenerApplicationStart]] event.
+ *
+ * If the spark context has a metrics registry, then the internal counters 
of queued entities,
+ * post failures and successes, and the performance of the posting 
operation are all registered
+ * as metrics.
+ *
+ * The shutdown logic is somewhat convoluted, as the posting thread may be 
blocked on HTTP IO
+ * when the shutdown process begins. In this situation, the thread 
continues to be blocked, and
+ * will be interrupted once the wait time has expired. All time consumed 
during the ongoing
+ * operation will be counted as part of the shutdown time period.
+ */
+private[spark] class YarnHistoryService extends SchedulerExtensionService
+  with Logging with Source {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  /** Get the current state */
+  def serviceState: Int = {
+ 

[GitHub] spark pull request: [SPARK-11315] [YARN] WiP Add YARN extension se...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r43221841
  
--- Diff: 
yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala
 ---
@@ -0,0 +1,757 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.io.IOException
+import java.net.{InetSocketAddress, NoRouteToHostException, URI, URL}
+import java.text.DateFormat
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{ArrayList => JArrayList, Collection => JCollection, 
Date, HashMap => JHashMap, Map => JMap}
+import java.{lang, util}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import 
org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, 
TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.spark
+import org.json4s.JsonAST.JObject
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.scheduler.{SparkListenerApplicationEnd, 
SparkListenerApplicationStart, SparkListenerEvent, SparkListenerExecutorAdded, 
SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, 
SparkListenerStageCompleted, SparkListenerStageSubmitted}
+import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Utility methods for timeline classes.
+ */
+private[spark] object YarnTimelineUtils extends Logging {
+
+  /**
+   * What attempt ID to use as the attempt ID field (not the entity ID) 
when
+   * there is no attempt info.
+   */
+  val SINGLE_ATTEMPT = "1"
+
+  /**
+   * Exception text when there is no event info data to unmarshall.
+   */
+  val E_NO_EVENTINFO = "No 'eventinfo' entry"
+
+  /**
+   * Exception text when there is event info entry in the timeline event, 
but it is empty.
+   */
+
+  val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry"
+
+  /**
+   * counter incremented on every spark event to timeline event creation,
+   * so guaranteeing uniqueness of event IDs across a single application 
attempt
+   * (which is implicitly, one per JVM).
+   */
+  val uid = new AtomicLong(System.currentTimeMillis())
+
+  /**
+   * Converts a Java object to its equivalent json4s representation.
+   */
+  def toJValue(obj: Object): JValue = {
+obj match {
+  case str: String => JString(str)
+  case dbl: java.lang.Double => JDouble(dbl)
+  case dec: java.math.BigDecimal => JDecimal(dec)
+  case int: java.lang.Integer => JInt(BigInt(int))
+  case long: java.lang.Long => JInt(BigInt(long))
+  case bool: java.lang.Boolean => JBool(bool)
+  case map: JMap[_, _] =>
+val jmap = map.asInstanceOf[JMap[String, Object]]
+JObject(jmap.entrySet().asScala.map { e => e.getKey -> 
toJValue(e.getValue) }.toList)
+  case array: JCollection[_] =>
+JArray(array.asInstanceOf[JCollection[Object]].asScala.map(o => 
toJValue(o)).toList)
+  case null => JNothing
+}
+  }
+
+  /**
+   * Converts a JValue into its Java equivalent.
+   */
+  def toJavaObject(v: JValue): Object = {
+v match {
+  case JNothing => null
+  case JNull => null
+  case JString(s) => s
+  case JDouble(num) =>

[GitHub] spark pull request: [SPARK-11315] [YARN] WiP Add YARN extension se...

2015-10-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r43221885
  
--- Diff: 
yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala
 ---
@@ -0,0 +1,757 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.io.IOException
+import java.net.{InetSocketAddress, NoRouteToHostException, URI, URL}
+import java.text.DateFormat
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{ArrayList => JArrayList, Collection => JCollection, 
Date, HashMap => JHashMap, Map => JMap}
+import java.{lang, util}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import 
org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, 
TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.spark
+import org.json4s.JsonAST.JObject
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.scheduler.{SparkListenerApplicationEnd, 
SparkListenerApplicationStart, SparkListenerEvent, SparkListenerExecutorAdded, 
SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, 
SparkListenerStageCompleted, SparkListenerStageSubmitted}
+import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Utility methods for timeline classes.
+ */
+private[spark] object YarnTimelineUtils extends Logging {
+
+  /**
+   * What attempt ID to use as the attempt ID field (not the entity ID) 
when
+   * there is no attempt info.
+   */
+  val SINGLE_ATTEMPT = "1"
+
+  /**
+   * Exception text when there is no event info data to unmarshall.
+   */
+  val E_NO_EVENTINFO = "No 'eventinfo' entry"
+
+  /**
+   * Exception text when there is event info entry in the timeline event, 
but it is empty.
+   */
+
+  val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry"
+
+  /**
+   * counter incremented on every spark event to timeline event creation,
+   * so guaranteeing uniqueness of event IDs across a single application 
attempt
+   * (which is implicitly, one per JVM).
+   */
+  val uid = new AtomicLong(System.currentTimeMillis())
+
+  /**
+   * Converts a Java object to its equivalent json4s representation.
+   */
+  def toJValue(obj: Object): JValue = {
+obj match {
+  case str: String => JString(str)
+  case dbl: java.lang.Double => JDouble(dbl)
+  case dec: java.math.BigDecimal => JDecimal(dec)
+  case int: java.lang.Integer => JInt(BigInt(int))
+  case long: java.lang.Long => JInt(BigInt(long))
+  case bool: java.lang.Boolean => JBool(bool)
+  case map: JMap[_, _] =>
+val jmap = map.asInstanceOf[JMap[String, Object]]
+JObject(jmap.entrySet().asScala.map { e => e.getKey -> 
toJValue(e.getValue) }.toList)
+  case array: JCollection[_] =>
+JArray(array.asInstanceOf[JCollection[Object]].asScala.map(o => 
toJValue(o)).toList)
+  case null => JNothing
+}
+  }
+
+  /**
+   * Converts a JValue into its Java equivalent.
+   */
+  def toJavaObject(v: JValue): Object = {
+v match {
+  case JNothing => null
+  case JNull => null
+  case JString(s) => s
+  case JDouble(num) =>

[GitHub] spark pull request: [SPARK-11315] [YARN] WiP Add YARN extension se...

2015-10-27 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-151740814
  
Hi @steveloughran , looks like this patch is quite large, can we just:

1. Remove some unnecessary getter/setter methods, it is quite Java way and 
so verbose in Scala.
2. Group and separate some class parameters into subclasses, it will be 
easily error-prone if you have a large mount of mutable parameter to track the 
state.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-10-28 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r43224039
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackeStateDStream.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import java.io.{IOException, ObjectOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.util.StateMap
+import org.apache.spark.util.Utils
+
+private[streaming] case class TrackStateRDDRecord[K: ClassTag, S: 
ClassTag, T: ClassTag](
+stateMap: StateMap[K, S], emittedRecords: Seq[T])
+
+
+private[streaming] class TrackStateRDDPartition(
+idx: Int,
+@transient private var prevStateRDD: RDD[_],
+@transient private var partitionedDataRDD: RDD[_]) extends Partition {
+
+  private[dstream] var previousSessionRDDPartition: Partition = null
+  private[dstream] var partitionedDataRDDPartition: Partition = null
+
+  override def index: Int = idx
+  override def hashCode(): Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
+// Update the reference to parent split at the time of task 
serialization
+previousSessionRDDPartition = prevStateRDD.partitions(index)
+partitionedDataRDDPartition = partitionedDataRDD.partitions(index)
+oos.defaultWriteObject()
+  }
+}
+
+private[streaming] class TrackStateRDD[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+_sc: SparkContext,
+private var prevStateRDD: RDD[TrackStateRDDRecord[K, S, T]],
+private var partitionedDataRDD: RDD[(K, V)],
+trackingFunction: (K, Option[V], State[S]) => Option[T],
+currentTime: Long, timeoutThresholdTime: Option[Long]
+  ) extends RDD[TrackStateRDDRecord[K, S, T]](
+_sc,
+List(
+  new OneToOneDependency[TrackStateRDDRecord[K, S, T]](prevStateRDD),
+  new OneToOneDependency(partitionedDataRDD))
+  ) {
+
+  @volatile private var doFullScan = false
+
+  require(partitionedDataRDD.partitioner.nonEmpty)
+  require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)
+
+  override val partitioner = prevStateRDD.partitioner
+
+  override def checkpoint(): Unit = {
+super.checkpoint()
+doFullScan = true
+  }
+
+  override def compute(
+  partition: Partition, context: TaskContext): 
Iterator[TrackStateRDDRecord[K, S, T]] = {
+
+val stateRDDPartition = partition.asInstanceOf[TrackStateRDDPartition]
+val prevStateRDDIterator = prevStateRDD.iterator(
+  stateRDDPartition.previousSessionRDDPartition, context)
+val dataIterator = partitionedDataRDD.iterator(
+  stateRDDPartition.partitionedDataRDDPartition, context)
+if (!prevStateRDDIterator.hasNext) {
+  throw new SparkException(s"Could not find state map in previous 
state RDD")
+}
+
+val newStateMap = prevStateRDDIterator.next().stateMap.copy()
+val emittedRecords = new ArrayBuffer[T]
+
+val wrappedState = new StateImpl[S]()
+
+dataIterator.foreach { case (key, value) =>
+  wrappedState.wrap(newStateMap.get(key))
+  val emittedRecord = trackingFunction(key, Some(value), wrappedState)
+  if (wrappedState.isRemoved) {
+newStateMap.remove(key)
+  } else if (wrappedState.isUpdated) {
+newStateMap.put(key, wrappedState.get(), currentTime)
+  }
+  emittedRecords ++= emittedRecord
--- End diff --

I don't think being an iterator cannot be persisted in memory an

[GitHub] spark pull request: [SPARK-11315] [YARN] WiP Add YARN extension se...

2015-10-28 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-151754072
  
BTW, do we need to take special care with #9297 when integrated with ATS?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2629][STREAMING] Basic implementation o...

2015-10-28 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9256#discussion_r43229671
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackeStateDStream.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import java.io.{IOException, ObjectOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.util.StateMap
+import org.apache.spark.util.Utils
+
+private[streaming] case class TrackStateRDDRecord[K: ClassTag, S: 
ClassTag, T: ClassTag](
+stateMap: StateMap[K, S], emittedRecords: Seq[T])
+
+
+private[streaming] class TrackStateRDDPartition(
+idx: Int,
+@transient private var prevStateRDD: RDD[_],
+@transient private var partitionedDataRDD: RDD[_]) extends Partition {
+
+  private[dstream] var previousSessionRDDPartition: Partition = null
+  private[dstream] var partitionedDataRDDPartition: Partition = null
+
+  override def index: Int = idx
+  override def hashCode(): Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
+// Update the reference to parent split at the time of task 
serialization
+previousSessionRDDPartition = prevStateRDD.partitions(index)
+partitionedDataRDDPartition = partitionedDataRDD.partitions(index)
+oos.defaultWriteObject()
+  }
+}
+
+private[streaming] class TrackStateRDD[K: ClassTag, V: ClassTag, S: 
ClassTag, T: ClassTag](
+_sc: SparkContext,
+private var prevStateRDD: RDD[TrackStateRDDRecord[K, S, T]],
+private var partitionedDataRDD: RDD[(K, V)],
+trackingFunction: (K, Option[V], State[S]) => Option[T],
+currentTime: Long, timeoutThresholdTime: Option[Long]
+  ) extends RDD[TrackStateRDDRecord[K, S, T]](
+_sc,
+List(
+  new OneToOneDependency[TrackStateRDDRecord[K, S, T]](prevStateRDD),
+  new OneToOneDependency(partitionedDataRDD))
+  ) {
+
+  @volatile private var doFullScan = false
+
+  require(partitionedDataRDD.partitioner.nonEmpty)
+  require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)
+
+  override val partitioner = prevStateRDD.partitioner
+
+  override def checkpoint(): Unit = {
+super.checkpoint()
+doFullScan = true
+  }
+
+  override def compute(
+  partition: Partition, context: TaskContext): 
Iterator[TrackStateRDDRecord[K, S, T]] = {
+
+val stateRDDPartition = partition.asInstanceOf[TrackStateRDDPartition]
+val prevStateRDDIterator = prevStateRDD.iterator(
+  stateRDDPartition.previousSessionRDDPartition, context)
+val dataIterator = partitionedDataRDD.iterator(
+  stateRDDPartition.partitionedDataRDDPartition, context)
+if (!prevStateRDDIterator.hasNext) {
+  throw new SparkException(s"Could not find state map in previous 
state RDD")
+}
+
+val newStateMap = prevStateRDDIterator.next().stateMap.copy()
+val emittedRecords = new ArrayBuffer[T]
+
+val wrappedState = new StateImpl[S]()
+
+dataIterator.foreach { case (key, value) =>
+  wrappedState.wrap(newStateMap.get(key))
+  val emittedRecord = trackingFunction(key, Some(value), wrappedState)
--- End diff --

Since we defined the tracking function as `(K, Option[V], State[S]) => 
Option[T]`, so user will expect `None` rather than `Some(null)` if the input 
value is null, that is a little misleading and will possibly lead to NPE if 
used like `v.map(_.toString).getOrElse(...)`.


---
If your project is set up for it, yo

[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

2015-10-29 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/9182#discussion_r43468563
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 ---
@@ -17,17 +17,17 @@
 
 package org.apache.spark.scheduler.cluster
 
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Future, ExecutionContext}
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
 
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.rpc._
--- End diff --

Do we need this import?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9817][YARN] Improve the locality calcul...

2015-11-01 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/8100#discussion_r43591368
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
 ---
@@ -179,4 +196,26 @@ private[yarn] class 
LocalityPreferredContainerPlacementStrategy(
   (host, math.max(0, (expectedCount - existedCount).ceil.toInt))
 }
   }
+
+  /**
+   * Calculate the host to possible number of containers for pending 
allocated containers.
+   * @param localityMatchedPendingAllocations A sequence of pending 
container request which
+   *  matches the localities of 
current required tasks.
+   * @return a Map with hostname as key and possible number of containers 
on this host as value
--- End diff --

I think it is the number of containers that will possibly be on the 
specific host, not the ratio. I will add more comments on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9817][YARN] Improve the locality calcul...

2015-11-01 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8100#issuecomment-152885722
  
@vanzin , thanks a lot for your review. Just adding more comments to 
clarify the calculations, please review again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9817][YARN] Improve the locality calcul...

2015-11-01 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/8100#issuecomment-152886708
  
Yeah, my bad, it is more likely a probability number, which could be 
fraction, otherwise it will lose the precision in the later computation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    9   10   11   12   13   14   15   >