[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18966
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82598/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18966
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18966
  
**[Test build #82598 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82598/testReport)**
 for PR 18966 at commit 
[`516a72a`](https://github.com/apache/spark/commit/516a72a62cb579f2952c4b776afec0dc1826e590).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread MrBago
Github user MrBago commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143853274
  
--- Diff: python/pyspark/ml/image.py ---
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pyspark
+from pyspark import SparkContext
+from pyspark.sql.types import *
+from pyspark.sql.types import Row, _create_row
+from pyspark.sql import DataFrame
+from pyspark.ml.param.shared import *
+import numpy as np
+
+undefinedImageType = "Undefined"
+
+ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"]
+
+ocvTypes = {
+undefinedImageType: -1,
+"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24,
+"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25,
+"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, 
"CV_16UC4": 26,
+"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, 
"CV_16SC4": 27,
+"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, 
"CV_32SC4": 28,
+"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, 
"CV_32FC4": 29,
+"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, 
"CV_64FC4": 30
+}
+
+ImageSchema = StructType([
+StructField(ImageFields[0], StringType(),  True),
+StructField(ImageFields[1], IntegerType(), False),
+StructField(ImageFields[2], IntegerType(), False),
+StructField(ImageFields[3], IntegerType(), False),
+# OpenCV-compatible type: CV_8UC3 in most cases
+StructField(ImageFields[4], StringType(), False),
+# bytes in OpenCV-compatible order: row-wise BGR in most cases
+StructField(ImageFields[5], BinaryType(), False)])
+
+
+# TODO: generalize to other datatypes and number of channels
+def toNDArray(image):
+"""
+Converts an image to a 1-dimensional array
+
+Args:
+image (object): The image to be converted
+
+Returns:
+array: The image as a 1-dimensional array
+
+.. versionadded:: 2.3.0
+"""
+height = image.height
+width = image.width
+return np.asarray(image.data, dtype=np.uint8) \
+ .reshape((height, width, 3))[:, :, (2, 1, 0)]
+
+
+# TODO: generalize to other datatypes and number of channels
+def toImage(array, origin="", mode="CV_8UC3"):
+"""
+
+Converts a one-dimensional array to a 2 dimensional image
+
+Args:
+array (array):
+origin (str):
+mode (int):
+
+Returns:
+object: 2 dimensional image
+
+.. versionadded:: 2.3.0
+"""
+length = np.prod(array.shape)
+
+data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)]
+  .reshape(length))
+height = array.shape[0]
+width = array.shape[1]
+nChannels = array.shape[2]
+# Creating new Row with _create_row(), because Row(name = value, ... )
--- End diff --

@holdenk I believe the ordered by name schema works in general, there is a 
serialization bug that I'm aware of, I filed it here, 
https://issues.apache.org/jira/browse/SPARK-22232 but I hope we can fix that 
for 2.3 (I can help with that).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #6751: [SPARK-8300] DataFrame hint for broadcast join.

2017-10-10 Thread sridharsubramanian62
Github user sridharsubramanian62 commented on the issue:

https://github.com/apache/spark/pull/6751
  
 Its available from spark 2.2.0.
On Tuesday, October 10, 2017, 1:46:27 PM PDT, Reynold Xin 
 wrote:  
 
 
Isn't the hint available in SQL?

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub, or mute the thread.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143850792
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -148,18 +149,30 @@ private[kinesis] class KinesisReceiver[T](
 
 kinesisCheckpointer = new KinesisCheckpointer(receiver, 
checkpointInterval, workerId)
 val kinesisProvider = kinesisCreds.provider
-val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
-  checkpointAppName,
-  streamName,
-  kinesisProvider,
-  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
-  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
-  workerId)
+
+val kinesisClientLibConfiguration = {
+  val baseClientLibConfiguration = new KinesisClientLibConfiguration(
+checkpointAppName,
+streamName,
+kinesisProvider,
+dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
+cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
+workerId)
 .withKinesisEndpoint(endpointUrl)
-.withInitialPositionInStream(initialPositionInStream)
+
.withInitialPositionInStream(initialPosition.initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)
 
+  // Update the Kinesis client lib config with timestamp
+  // if InitialPositionInStream.AT_TIMESTAMP is passed
+  initialPosition match {
+case atTimestamp: AtTimestamp =>
--- End diff --

nit: 
```scala
initialPosition match {
  case AtTimestamp(ts) =>
baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts)
...
}

```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143850553
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -182,14 +181,14 @@ object KinesisInputDStream {
 
 /**
  * Sets the initial position data is read from in the Kinesis stream. 
Defaults to
- * [[InitialPositionInStream.LATEST]] if no custom value is specified.
+ * [[InitialPosition.latest]] if no custom value is specified.
  *
- * @param initialPosition InitialPositionInStream value specifying 
where Spark Streaming
+ * @param initialPosition [[InitialPosition]] value specifying where 
Spark Streaming
  *will start reading records in the Kinesis 
stream from
  * @return Reference to this [[KinesisInputDStream.Builder]]
  */
-def initialPositionInStream(initialPosition: InitialPositionInStream): 
Builder = {
--- End diff --

don't remove this API, since it will break compatibility. Instead add an 
API to take the `withTimestamp`. In the end if we see that `withTimestamp` has 
been set, but initial position isn't `AtTimestamp`, then we throw an error. 
Likewise if `AtTimestamp` is set, but no timestamp has been provided, also 
throw an error.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143849596
  
--- Diff: 
external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import java.util.Date;
+
+/**
+ * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition
+ * to expose the corresponding scala objects for InitialPositionInStream.
+ * The functions are intentionally Upper cased to appear like classes for
+ * usage in Java classes.
+ */
+public class KinesisInitialPosition {
+
+/**
+ * Returns instance of AtTimestamp with InitialPositionInStream.LATEST.
+ * @return
+ */
+public static InitialPosition Latest() {
+return InitialPosition$.MODULE$.latest();
--- End diff --

Can you instead return `Latest$.MODULE$`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143850857
  
--- Diff: 
external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
 ---
@@ -45,15 +43,15 @@ public void testJavaKinesisDStreamBuilder() {
   .streamName(streamName)
   .endpointUrl(endpointUrl)
   .regionName(region)
-  .initialPositionInStream(initialPosition)
--- End diff --

ditto on API change


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143849691
  
--- Diff: 
external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import java.util.Date;
+
+/**
+ * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition
+ * to expose the corresponding scala objects for InitialPositionInStream.
+ * The functions are intentionally Upper cased to appear like classes for
+ * usage in Java classes.
+ */
+public class KinesisInitialPosition {
+
+/**
+ * Returns instance of AtTimestamp with InitialPositionInStream.LATEST.
+ * @return
+ */
+public static InitialPosition Latest() {
+return InitialPosition$.MODULE$.latest();
+}
+
+/**
+ * Returns instance of AtTimestamp with 
InitialPositionInStream.TRIM_HORIZON.
+ * @return
+ */
+public static InitialPosition TrimHorizon() {
+return InitialPosition$.MODULE$.trimHorizon();
+}
+
+/**
+ * Returns instance of AtTimestamp with 
InitialPositionInStream.AT_TIMESTAMP.
+ * @param timestamp
+ * @return
+ */
+public static InitialPosition AtTimestamp(Date timestamp) {
+return InitialPosition$.MODULE$.atTimestamp(timestamp);
--- End diff --

`AtTimestamp.apply(timestamp)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143849634
  
--- Diff: 
external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import java.util.Date;
+
+/**
+ * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition
+ * to expose the corresponding scala objects for InitialPositionInStream.
+ * The functions are intentionally Upper cased to appear like classes for
+ * usage in Java classes.
+ */
+public class KinesisInitialPosition {
+
+/**
+ * Returns instance of AtTimestamp with InitialPositionInStream.LATEST.
+ * @return
+ */
+public static InitialPosition Latest() {
+return InitialPosition$.MODULE$.latest();
+}
+
+/**
+ * Returns instance of AtTimestamp with 
InitialPositionInStream.TRIM_HORIZON.
+ * @return
+ */
+public static InitialPosition TrimHorizon() {
+return InitialPosition$.MODULE$.trimHorizon();
--- End diff --

Can you instead return `TrimHorizon$.MODULE$`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143849442
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.Date
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+/**
+ * Trait for Kinesis's InitialPositionInStream.
+ * This will be overridden by more specific types.
+ */
+sealed trait InitialPosition {
+  val initialPositionInStream: InitialPositionInStream
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.LATEST.
+ */
+case object Latest extends InitialPosition {
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.LATEST
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON.
+ */
+case object TrimHorizon extends InitialPosition {
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.TRIM_HORIZON
+}
+
+/**
+ * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP.
+ */
+case class AtTimestamp(timestamp: Date) extends InitialPosition {
+  override val initialPositionInStream: InitialPositionInStream
+= InitialPositionInStream.AT_TIMESTAMP
+}
+
+/**
+ * Companion object for InitialPosition that returns
+ * appropriate version of InitialPositionInStream.
+ */
+object InitialPosition {
+
+  /**
+   * An instance of Latest with InitialPositionInStream.LATEST.
+   * @return [[Latest]]
+   */
+  val latest: InitialPosition = Latest
+
+  /**
+   * An instance of Latest with InitialPositionInStream.TRIM_HORIZON.
+   * @return [[TrimHorizon]]
+   */
+  val trimHorizon: InitialPosition = TrimHorizon
--- End diff --

We don't need both the Scala API for this anymore. We can just use the Java 
objects directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...

2017-10-10 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/18029#discussion_r143850590
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -182,14 +181,14 @@ object KinesisInputDStream {
 
 /**
  * Sets the initial position data is read from in the Kinesis stream. 
Defaults to
- * [[InitialPositionInStream.LATEST]] if no custom value is specified.
+ * [[InitialPosition.latest]] if no custom value is specified.
--- End diff --

don't change docs


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19250
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19250
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82595/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19181


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19250
  
**[Test build #82595 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82595/testReport)**
 for PR 19250 at commit 
[`5607160`](https://github.com/apache/spark/commit/5607160afaf0f5ecd93fa59b97549bec937991b4).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/19181
  
Merging to master. Thanks!

Can you create a backport for Spark-2.2?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18966
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18966
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82596/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18966
  
**[Test build #82596 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82596/testReport)**
 for PR 18966 at commit 
[`4c47802`](https://github.com/apache/spark/commit/4c4780207afcc2d55d19bf8d3e9fc29812f07ae8).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19181
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82594/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19181
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19181
  
**[Test build #82594 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82594/testReport)**
 for PR 19181 at commit 
[`6b901ee`](https://github.com/apache/spark/commit/6b901eea4dd7aace3e6a47b333c0dfa815b031db).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #6751: [SPARK-8300] DataFrame hint for broadcast join.

2017-10-10 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/6751
  
Isn't the hint available in SQL?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/18664
  
I'll work on doing (1) to have conversions in Python for Arrow to match 
Non-Arrow and we can see how that turns out.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/18664
  
> I'm just wondering what if we use timestamp in nested types. Currently we 
don't support nested types but in the future?

I'll try to take this into account, or at least add a note for timestamps 
in nested types

> BTW, could we just use DateTimeUtils.defaultTimeZone() instead of 
SQLConf.SESSION_LOCAL_TIMEZONE if you really meant it? I think the previous 
behaviour without Arrow does not count SQLConf.SESSION_LOCAL_TIMEZONE?

@ueshin do you agree to just use `DateTimeUtils.defaultTimeZone()`? Most 
places where Arrow is used this would be the case.  The only scenario I can 
think of that would not use it is if the user generated timestamps with 
SparkSQL and then called `toPandas()` or ran a `pandas_udf`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/18664
  
> BTW, do you think it is possible to easily de-duplicate timezone handling 
for both with-Arrow and without-Arrow within Python side if we go for 1. in the 
separate PR?

@HyukjinKwon , are you referring to how we might handle timestamps with 
timezones in a future PR?  I think doing this for non-arrow would be a bit more 
complicated than just the Arrow cases, and I'm not sure the best way to handle 
it unless we just make a new TimestampWithTimezone data type..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17100: [SPARK-13947][SQL] PySpark DataFrames: The error message...

2017-10-10 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/17100
  
Will review it this weekend. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19466: [SPARK-22237] [CORE] Fix spark submit file download for ...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19466
  
**[Test build #82604 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82604/testReport)**
 for PR 19466 at commit 
[`807a767`](https://github.com/apache/spark/commit/807a767f8e1f90789299e83c995d9e6e50015a96).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19466: [CORE] Fix spark submit file download for standalone cli...

2017-10-10 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19466
  
cc @yhuai @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19466: [CORE] Fix spark submit file download for standalone cli...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19466
  
**[Test build #82603 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82603/testReport)**
 for PR 19466 at commit 
[`86e62f6`](https://github.com/apache/spark/commit/86e62f6906bc19fb4f3b91523ab112d4315f2105).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15670: [SPARK-18161] [Python] Allow pickle to serialize >4 GB o...

2017-10-10 Thread holdenk
Github user holdenk commented on the issue:

https://github.com/apache/spark/pull/15670
  
Do we plan to move this forward? Do you have the time to update this 
@singularperturbation ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15670: [SPARK-18161] [Python] Allow pickle to serialize ...

2017-10-10 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/15670#discussion_r143838765
  
--- Diff: python/pyspark/serializers.py ---
@@ -64,7 +64,7 @@
 from itertools import izip as zip
 else:
 import pickle
-protocol = 3
+protocol = min(pickle.HIGHEST_PROTOCOL, 4)
--- End diff --

It sounds like this part of the change would be ok then since Pyrolite can 
read v4 just fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19466: [CORE] Fix spark submit file download for standal...

2017-10-10 Thread loneknightpy
GitHub user loneknightpy opened a pull request:

https://github.com/apache/spark/pull/19466

[CORE] Fix spark submit file download for standalone client mode

## What changes were proposed in this pull request?

This PR makes spark-submit script to use downloaded files in 
local/standalone client mode.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/loneknightpy/spark 
fix-spark-submit-for-standalone

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19466.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19466


commit 1bde00c0d5c90cebdd2a8514078ace8cf33c42f8
Author: Yu Peng 
Date:   2017-10-10T20:06:37Z

Fix spark submit file download for standalone client mode




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15666: [SPARK-11421] [Core][Python][R] Added ability for...

2017-10-10 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/15666#discussion_r143838425
  
--- Diff: python/pyspark/tests.py ---
@@ -435,6 +436,19 @@ def test_add_file_locally(self):
 with open(download_path) as test_file:
 self.assertEqual("Hello World!\n", test_file.readline())
 
+def test_add_jar(self):
+jvm = self.sc._jvm
+# We shouldn't be able to load anything from the package before it 
is added
+self.assertFalse(isinstance(jvm.pysparktests.DummyClass, 
JavaClass))
+# Generate and compile the test jar
+destDir = os.path.join(SPARK_HOME, "python/test_support/jar")
--- End diff --

Instead you want to use a temp directory?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18457: [SPARK-21241][MLlib]- Add setIntercept to StreamingLinea...

2017-10-10 Thread holdenk
Github user holdenk commented on the issue:

https://github.com/apache/spark/pull/18457
  
In the meantime Jenkins OK to test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17100: [SPARK-13947][SQL] PySpark DataFrames: The error message...

2017-10-10 Thread holdenk
Github user holdenk commented on the issue:

https://github.com/apache/spark/pull/17100
  
re-ping @gatorsmile?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18457: [SPARK-21241][MLlib]- Add setIntercept to StreamingLinea...

2017-10-10 Thread holdenk
Github user holdenk commented on the issue:

https://github.com/apache/spark/pull/18457
  
So we're only really doing bug fixes on the old MLlib stuff, but I guess we 
haven't finalized the new ML streaming stuff so I'm not sure what we want to do 
here. cc @MLnick .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143836094
  
--- Diff: python/pyspark/ml/image.py ---
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pyspark
+from pyspark import SparkContext
+from pyspark.sql.types import *
+from pyspark.sql.types import Row, _create_row
+from pyspark.sql import DataFrame
+from pyspark.ml.param.shared import *
+import numpy as np
+
+undefinedImageType = "Undefined"
+
+ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"]
+
+ocvTypes = {
+undefinedImageType: -1,
+"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24,
+"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25,
+"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, 
"CV_16UC4": 26,
+"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, 
"CV_16SC4": 27,
+"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, 
"CV_32SC4": 28,
+"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, 
"CV_32FC4": 29,
+"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, 
"CV_64FC4": 30
+}
+
+ImageSchema = StructType([
+StructField(ImageFields[0], StringType(),  True),
+StructField(ImageFields[1], IntegerType(), False),
+StructField(ImageFields[2], IntegerType(), False),
+StructField(ImageFields[3], IntegerType(), False),
+# OpenCV-compatible type: CV_8UC3 in most cases
+StructField(ImageFields[4], StringType(), False),
+# bytes in OpenCV-compatible order: row-wise BGR in most cases
+StructField(ImageFields[5], BinaryType(), False)])
+
+
+# TODO: generalize to other datatypes and number of channels
+def toNDArray(image):
+"""
+Converts an image to a 1-dimensional array
+
+Args:
+image (object): The image to be converted
+
+Returns:
+array: The image as a 1-dimensional array
+
+.. versionadded:: 2.3.0
+"""
+height = image.height
+width = image.width
+return np.asarray(image.data, dtype=np.uint8) \
+ .reshape((height, width, 3))[:, :, (2, 1, 0)]
+
+
+# TODO: generalize to other datatypes and number of channels
+def toImage(array, origin="", mode="CV_8UC3"):
+"""
+
+Converts a one-dimensional array to a 2 dimensional image
+
+Args:
+array (array):
+origin (str):
+mode (int):
+
+Returns:
+object: 2 dimensional image
+
+.. versionadded:: 2.3.0
+"""
+length = np.prod(array.shape)
+
+data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)]
+  .reshape(length))
+height = array.shape[0]
+width = array.shape[1]
+nChannels = array.shape[2]
+# Creating new Row with _create_row(), because Row(name = value, ... )
+# orders fields by name, which conflicts with expected ImageSchema 
order
+# when the new DataFrame is created by UDF
+return _create_row(ImageFields,
+   [origin, height, width, nChannels, mode, data])
+
+
+def readImages(path,
+   recursive=False,
+   numPartitions=0,
+   dropImageFailures=False,
+   sampleRatio=1.0):
+"""
+Reads the directory of images from the local or remote (WASB) source.
+Args:
--- End diff --

These arguments don't match the function arguments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143836725
  
--- Diff: python/pyspark/ml/image.py ---
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pyspark
+from pyspark import SparkContext
+from pyspark.sql.types import *
+from pyspark.sql.types import Row, _create_row
+from pyspark.sql import DataFrame
+from pyspark.ml.param.shared import *
+import numpy as np
+
+undefinedImageType = "Undefined"
+
+ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"]
+
+ocvTypes = {
+undefinedImageType: -1,
+"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24,
+"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25,
+"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, 
"CV_16UC4": 26,
+"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, 
"CV_16SC4": 27,
+"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, 
"CV_32SC4": 28,
+"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, 
"CV_32FC4": 29,
+"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, 
"CV_64FC4": 30
+}
+
+ImageSchema = StructType([
+StructField(ImageFields[0], StringType(),  True),
+StructField(ImageFields[1], IntegerType(), False),
+StructField(ImageFields[2], IntegerType(), False),
+StructField(ImageFields[3], IntegerType(), False),
+# OpenCV-compatible type: CV_8UC3 in most cases
+StructField(ImageFields[4], StringType(), False),
+# bytes in OpenCV-compatible order: row-wise BGR in most cases
+StructField(ImageFields[5], BinaryType(), False)])
+
+
+# TODO: generalize to other datatypes and number of channels
+def toNDArray(image):
+"""
+Converts an image to a 1-dimensional array
+
+Args:
+image (object): The image to be converted
+
+Returns:
+array: The image as a 1-dimensional array
+
+.. versionadded:: 2.3.0
+"""
+height = image.height
+width = image.width
+return np.asarray(image.data, dtype=np.uint8) \
+ .reshape((height, width, 3))[:, :, (2, 1, 0)]
+
+
+# TODO: generalize to other datatypes and number of channels
+def toImage(array, origin="", mode="CV_8UC3"):
+"""
+
+Converts a one-dimensional array to a 2 dimensional image
+
+Args:
+array (array):
+origin (str):
+mode (int):
--- End diff --

The argument description here is incorrect and not very useful (or the 
default parameters for the function are incorrect).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143836993
  
--- Diff: python/pyspark/ml/image.py ---
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pyspark
+from pyspark import SparkContext
+from pyspark.sql.types import *
+from pyspark.sql.types import Row, _create_row
+from pyspark.sql import DataFrame
+from pyspark.ml.param.shared import *
+import numpy as np
+
+undefinedImageType = "Undefined"
+
+ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"]
+
+ocvTypes = {
+undefinedImageType: -1,
+"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24,
+"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25,
+"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, 
"CV_16UC4": 26,
+"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, 
"CV_16SC4": 27,
+"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, 
"CV_32SC4": 28,
+"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, 
"CV_32FC4": 29,
+"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, 
"CV_64FC4": 30
+}
+
+ImageSchema = StructType([
+StructField(ImageFields[0], StringType(),  True),
+StructField(ImageFields[1], IntegerType(), False),
+StructField(ImageFields[2], IntegerType(), False),
+StructField(ImageFields[3], IntegerType(), False),
+# OpenCV-compatible type: CV_8UC3 in most cases
+StructField(ImageFields[4], StringType(), False),
+# bytes in OpenCV-compatible order: row-wise BGR in most cases
+StructField(ImageFields[5], BinaryType(), False)])
+
+
+# TODO: generalize to other datatypes and number of channels
+def toNDArray(image):
+"""
+Converts an image to a 1-dimensional array
+
+Args:
+image (object): The image to be converted
+
+Returns:
+array: The image as a 1-dimensional array
+
+.. versionadded:: 2.3.0
+"""
+height = image.height
+width = image.width
+return np.asarray(image.data, dtype=np.uint8) \
+ .reshape((height, width, 3))[:, :, (2, 1, 0)]
+
+
+# TODO: generalize to other datatypes and number of channels
+def toImage(array, origin="", mode="CV_8UC3"):
+"""
+
+Converts a one-dimensional array to a 2 dimensional image
+
+Args:
+array (array):
+origin (str):
+mode (int):
+
+Returns:
+object: 2 dimensional image
+
+.. versionadded:: 2.3.0
+"""
+length = np.prod(array.shape)
+
+data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)]
+  .reshape(length))
+height = array.shape[0]
+width = array.shape[1]
+nChannels = array.shape[2]
+# Creating new Row with _create_row(), because Row(name = value, ... )
--- End diff --

Would it maybe make sense to support the ordered by name schema? Other 
folks might make their own loaders and this seems like a potential point of 
confusion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...

2017-10-10 Thread a10y
Github user a10y commented on a diff in the pull request:

https://github.com/apache/spark/pull/19082#discussion_r143836700
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -244,6 +246,89 @@ case class HashAggregateExec(
 
   protected override val shouldStopRequired = false
 
+  // Extracts all the input variable references for a given `aggExpr`. 
This result will be used
+  // to split aggregation into small functions.
+  private def getInputVariableReferences(
+  ctx: CodegenContext,
+  aggExpr: Expression,
+  subExprs: Map[Expression, SubExprEliminationState]): Set[(String, 
String)] = {
+// `argSet` collects all the pairs of variable names and their types, 
the first in the pair is
+// a type name and the second is a variable name.
+val argSet = mutable.Set[(String, String)]()
+val stack = mutable.Stack[Expression](aggExpr)
+while (stack.nonEmpty) {
+  stack.pop() match {
+case e if subExprs.contains(e) =>
+  val exprCode = subExprs(e)
+  if (CodegenContext.isJavaIdentifier(exprCode.value)) {
+argSet += ((ctx.javaType(e.dataType), exprCode.value))
+  }
+  if (CodegenContext.isJavaIdentifier(exprCode.isNull)) {
+argSet += (("boolean", exprCode.isNull))
+  }
+  // Since the children possibly has common expressions, we push 
them here
+  stack.pushAll(e.children)
+case ref: BoundReference
+if ctx.currentVars != null && ctx.currentVars(ref.ordinal) != 
null =>
+  val value = ctx.currentVars(ref.ordinal).value
+  val isNull = ctx.currentVars(ref.ordinal).isNull
+  if (CodegenContext.isJavaIdentifier(value)) {
+argSet += ((ctx.javaType(ref.dataType), value))
+  }
+  if (CodegenContext.isJavaIdentifier(isNull)) {
+argSet += (("boolean", isNull))
+  }
+case _: BoundReference =>
+  argSet += (("InternalRow", ctx.INPUT_ROW))
+case e =>
+  stack.pushAll(e.children)
+  }
+}
+
+argSet.toSet
+  }
+
+  // Splits the aggregation into small functions because the HotSpot does 
not compile
+  // too long functions.
+  private def splitAggregateExpressions(
+  ctx: CodegenContext,
+  aggExprs: Seq[Expression],
+  evalAndUpdateCodes: Seq[String],
+  subExprs: Map[Expression, SubExprEliminationState],
+  otherArgs: Seq[(String, String)] = Seq.empty): Seq[String] = {
+aggExprs.zipWithIndex.map { case (aggExpr, i) =>
+  // The maximum number of parameters in non-static Java methods is 
254, so this method gives
+  // up splitting the code if the number goes over the limit.
+  // You can find more information about the limit in the JVM 
specification:
+  //   - The number of method parameters is limited to 255 by the 
definition of a method
--- End diff --

Unless you wanna check the types of all the parameters you might be better 
off halving this to 127 parameters for the worst case. Though I'm not sure how 
many codegens this affects...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...

2017-10-10 Thread a10y
Github user a10y commented on a diff in the pull request:

https://github.com/apache/spark/pull/19082#discussion_r143836110
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -244,6 +246,89 @@ case class HashAggregateExec(
 
   protected override val shouldStopRequired = false
 
+  // Extracts all the input variable references for a given `aggExpr`. 
This result will be used
+  // to split aggregation into small functions.
+  private def getInputVariableReferences(
+  ctx: CodegenContext,
+  aggExpr: Expression,
+  subExprs: Map[Expression, SubExprEliminationState]): Set[(String, 
String)] = {
+// `argSet` collects all the pairs of variable names and their types, 
the first in the pair is
+// a type name and the second is a variable name.
+val argSet = mutable.Set[(String, String)]()
+val stack = mutable.Stack[Expression](aggExpr)
+while (stack.nonEmpty) {
+  stack.pop() match {
+case e if subExprs.contains(e) =>
+  val exprCode = subExprs(e)
+  if (CodegenContext.isJavaIdentifier(exprCode.value)) {
+argSet += ((ctx.javaType(e.dataType), exprCode.value))
+  }
+  if (CodegenContext.isJavaIdentifier(exprCode.isNull)) {
+argSet += (("boolean", exprCode.isNull))
+  }
+  // Since the children possibly has common expressions, we push 
them here
+  stack.pushAll(e.children)
+case ref: BoundReference
+if ctx.currentVars != null && ctx.currentVars(ref.ordinal) != 
null =>
+  val value = ctx.currentVars(ref.ordinal).value
+  val isNull = ctx.currentVars(ref.ordinal).isNull
+  if (CodegenContext.isJavaIdentifier(value)) {
+argSet += ((ctx.javaType(ref.dataType), value))
+  }
+  if (CodegenContext.isJavaIdentifier(isNull)) {
+argSet += (("boolean", isNull))
+  }
+case _: BoundReference =>
+  argSet += (("InternalRow", ctx.INPUT_ROW))
+case e =>
+  stack.pushAll(e.children)
+  }
+}
+
+argSet.toSet
+  }
+
+  // Splits the aggregation into small functions because the HotSpot does 
not compile
+  // too long functions.
+  private def splitAggregateExpressions(
+  ctx: CodegenContext,
+  aggExprs: Seq[Expression],
+  evalAndUpdateCodes: Seq[String],
+  subExprs: Map[Expression, SubExprEliminationState],
+  otherArgs: Seq[(String, String)] = Seq.empty): Seq[String] = {
+aggExprs.zipWithIndex.map { case (aggExpr, i) =>
+  // The maximum number of parameters in non-static Java methods is 
254, so this method gives
+  // up splitting the code if the number goes over the limit.
+  // You can find more information about the limit in the JVM 
specification:
+  //   - The number of method parameters is limited to 255 by the 
definition of a method
--- End diff --

If you read the spec closely, it actually says that there are 255 slots, 
where 1 slot is taken by **this** and 2 slots each are taken up by **long** and 
**double** parameters.

https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-4.html#jvms-4.3.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19082: [SPARK-21870][SQL] Split aggregation code into small fun...

2017-10-10 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19082
  
@kiszk This is one of the solutions if we can remove the limit of num of 
parameters. However, this does not resolve all the issues. For example, the 
method becomes too big to inline. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19438: [SPARK-22208] [SQL] Improve percentile_approx by not rou...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19438
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19438: [SPARK-22208] [SQL] Improve percentile_approx by not rou...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19438
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82593/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19272
  
**[Test build #82602 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82602/testReport)**
 for PR 19272 at commit 
[`f5925fd`](https://github.com/apache/spark/commit/f5925fd2ab31c1d6825fa546102172ed8a55ae8b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19438: [SPARK-22208] [SQL] Improve percentile_approx by not rou...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19438
  
**[Test build #82593 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82593/testReport)**
 for PR 19438 at commit 
[`1180265`](https://github.com/apache/spark/commit/11802650c938be59163721254193c67bf3949a99).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-10-10 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/19272
  
add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-10-10 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/19272
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19250
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82592/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19250
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19250
  
**[Test build #82592 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82592/testReport)**
 for PR 19250 at commit 
[`5c03e07`](https://github.com/apache/spark/commit/5c03e07ca0c3ec95b580280b7efe7aa7e1b5d734).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class AdjustTimestamps(conf: SQLConf) extends Rule[LogicalPlan] `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18460: [SPARK-21247][SQL] Type comparison should respect case-s...

2017-10-10 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/18460
  
Hi, @gatorsmile .
Could you review this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19337#discussion_r143818776
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala ---
@@ -224,6 +224,24 @@ private[clustering] trait LDAParams extends Params 
with HasFeaturesCol with HasM
   /**
* For Online optimizer only: [[optimizer]] = "online".
*
+   * A (positive) learning parameter that controls the convergence of 
variational inference.
+   * Smaller value will lead to more accuracy model and longer training 
time.
--- End diff --

Sorry to be troublesome here. I'm not sure smaller value for the 
convergence tolerance will always lead to a more accurate model. Smaller value 
will make the statistics for each batch more fitting to the training data (even 
over fitting), I'm not sure about the overall model accuracy will be better, 
especially for data not in the training dataset. 

Maybe just "Smaller value will lead to a more converged model and longer 
training time"?

Please also add doc for default value.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19082: [SPARK-21870][SQL] Split aggregation code into small fun...

2017-10-10 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19082
  
@gatorsmile let me clarify. In other words, do you want to find how to cut 
the boundary to ensure enabling method inlining for callee methods? It could 
maximize advantage of both whole-stage codegen and JIT optimization in a 
compilation unit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19465: [SPARK-21988][SS]Implement StreamingRelation.computeStat...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19465
  
**[Test build #82600 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82600/testReport)**
 for PR 19465 at commit 
[`96f2c30`](https://github.com/apache/spark/commit/96f2c300de3a3e74e7f2c1794817aff8f4a18f58).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19459
  
**[Test build #82601 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82601/testReport)**
 for PR 19459 at commit 
[`c7ddee6`](https://github.com/apache/spark/commit/c7ddee6b7ab91c1651a397a716ed91ed2a8383a3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19309: [SPARK-19558][sql] Add config key to register QueryExecu...

2017-10-10 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/19309
  
That's basically what the PR summary says.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19459#discussion_r143821657
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -203,4 +205,16 @@ private[sql] object ArrowConverters {
   reader.close()
 }
   }
+
+  def toDataFrame(
--- End diff --

I left the conversion logic in `ArrowConverters` because I think there is a 
good chance it will change, so just added a wrapper to `PythonSQLUtils` let me 
know if it's ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19465: [SPARK-21988]Implement StreamingRelation.computeS...

2017-10-10 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/19465

[SPARK-21988]Implement StreamingRelation.computeStats to fix explain

## What changes were proposed in this pull request?

Implement StreamingRelation.computeStats to fix explain

## How was this patch tested?

- unit tests: `StreamingRelation.computeStats` and 
`StreamingExecutionRelation.computeStats`.
- regression tests: `explain join with a normal source` and `explain join 
with MemoryStream`.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-21988

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19465.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19465






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19465: [SPARK-21988]Implement StreamingRelation.computeStats to...

2017-10-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19465
  
cc @joseph-torres 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143801968
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
--- End diff --

s/exec/executor. That is if you even want to keep this log around...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143802667
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143817781
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -646,18 +648,14 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   Future.successful(true)
 }
 
-  val killExecutors: Boolean => Future[Boolean] =
-if (!executorsToKill.isEmpty) {
-  _ => doKillExecutors(executorsToKill)
-} else {
-  _ => Future.successful(false)
-}
-
-  val killResponse = 
adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+  val killResponse = if (executorsToKill.nonEmpty) {
+adjustTotalExecutors.flatMap(_ => 
doKillExecutors(executorsToKill))(ThreadUtils.sameThread)
+  } else {
+Future.successful(false)
+  }
 
-  killResponse.flatMap(killSuccessful =>
-Future.successful (if (killSuccessful) executorsToKill else 
Seq.empty[String])
-  )(ThreadUtils.sameThread)
+  killResponse.map { successful => if (successful) executorsToKill 
else Seq.empty
--- End diff --

```
killResponse.map { successful => 
  if (successful) executorsToKill else Seq.empty
}(ThreadUtils.sameThread)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143800800
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
--- End diff --

`: Unit =`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143813032
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark issue #19082: [SPARK-21870][SQL] Split aggregation code into small fun...

2017-10-10 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19082
  
https://github.com/apache/spark/pull/18931 is not what we want, although it 
can partially resolves some issues. Simply disabling the whole-stage codegen 
might trigger the regression like Q66. The general goal is to maximize the 
advantage of both whole-stage codegen and JIT optimization. Now, the point is 
how to cut the boundaries without introducing the extra IR. 




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143814841
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -88,6 +88,8 @@ private[spark] class ExecutorAllocationManager(
 
   import ExecutorAllocationManager._
 
+  var cacheRecoveryManager: CacheRecoveryManager = _
--- End diff --

private?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143802485
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143820628
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -237,6 +246,43 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def replicateOneBlock(
+  execId: String,
+  blockId: BlockId,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"replicating block $blockId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  replicaSet <- blockLocations.asScala.get(blockId)
+  replicas = replicaSet.toSeq
+  maxReps = replicaSet.size + 2
+} yield info.slaveEndpoint.ask[Boolean](ReplicateBlock(blockId, 
replicas, excluded, maxReps))
+
+response.getOrElse(Future.successful(false)).foreach(context.reply)
+  }
+
+  private def getCachedBlocks(executorId: String): collection.Set[BlockId] 
= {
+val cachedBlocks = for {
+  blockManagerId <- blockManagerIdByExecutor.get(executorId)
+  info <- blockManagerInfo.get(blockManagerId)
+} yield info.cachedBlocks
+
+cachedBlocks.getOrElse(Set.empty)
+  }
+
+  private def getSizeOfBlocks(blockMap: Map[String, Set[RDDBlockId]]): 
Map[String, Long] = for {
--- End diff --

Put the body in `{ }` - looks kinda odd without it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143812180
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143810812
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143810600
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143800993
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
--- End diff --

The method doesn't return the list, so its short description should more 
accurately reflect what it actually does.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143819718
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -237,6 +246,43 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def replicateOneBlock(
+  execId: String,
+  blockId: BlockId,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"replicating block $blockId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  replicaSet <- blockLocations.asScala.get(blockId)
+  replicas = replicaSet.toSeq
+  maxReps = replicaSet.size + 2
--- End diff --

Why `+ 2`? Good to add comment explaining.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143813769
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143815041
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -234,12 +239,14 @@ private[spark] class ExecutorAllocationManager(
 executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, 
TimeUnit.MILLISECONDS)
 
 client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
+cacheRecoveryManager = CacheRecoveryManager(this, conf)
--- End diff --

Gate on `recoverCachedData`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143816789
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -76,6 +76,14 @@ package object config {
 .timeConf(TimeUnit.MILLISECONDS)
 .createWithDefaultString("3s")
 
+  private[spark] val DYN_ALLOCATION_RECOVER_CACHE =
+
ConfigBuilder("spark.dynamicAllocation.recoverCachedData").booleanConf.createWithDefault(false)
--- End diff --

`.cacheRecovery.enabled` to follow pattern of other feature flags.

I also kinda prefer multiple lines in these constant declarations, for 
readability.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143818189
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -97,6 +97,10 @@ class BlockManagerMasterEndpoint(
 case GetStorageStatus =>
   context.reply(storageStatus)
 
+case GetCachedBlocks(executorId) => 
context.reply(getCachedBlocks(executorId))
--- End diff --

Code in next line to follow pattern in other cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143802914
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
--- End diff --

s/RECOVER_CACHE/CACHE_RECOVERY


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143811908
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143807978
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143811393
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143816598
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -416,63 +423,52 @@ private[spark] class ExecutorAllocationManager(
* Request the cluster manager to remove the given executors.
* Returns the list of executors which are removed.
*/
-  private def removeExecutors(executors: Seq[String]): Seq[String] = 
synchronized {
-val executorIdsToBeRemoved = new ArrayBuffer[String]
-
+  private def removeExecutors(executors: Seq[String]): Unit = synchronized 
{
 logInfo("Request to remove executorIds: " + executors.mkString(", "))
-val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
-
-var newExecutorTotal = numExistingExecutors
-executors.foreach { executorIdToBeRemoved =>
-  if (newExecutorTotal - 1 < minNumExecutors) {
-logDebug(s"Not removing idle executor $executorIdToBeRemoved 
because there are only " +
-  s"$newExecutorTotal executor(s) left (minimum number of executor 
limit $minNumExecutors)")
-  } else if (newExecutorTotal - 1 < numExecutorsTarget) {
-logDebug(s"Not removing idle executor $executorIdToBeRemoved 
because there are only " +
-  s"$newExecutorTotal executor(s) left (number of executor target 
$numExecutorsTarget)")
-  } else if (canBeKilled(executorIdToBeRemoved)) {
-executorIdsToBeRemoved += executorIdToBeRemoved
-newExecutorTotal -= 1
-  }
-}
+val numExistingExecs = allocationManager.executorIds.size - 
executorsPendingToRemove.size
+val execCountFloor = math.max(minNumExecutors, numExecutorsTarget)
+val (executorIdsToBeRemoved, dontRemove) = executors
+  .filter(canBeKilled)
+  .splitAt(numExistingExecs - execCountFloor)
 
-if (executorIdsToBeRemoved.isEmpty) {
-  return Seq.empty[String]
+dontRemove.foreach { execId =>
+  logDebug(s"Not removing idle executor $execId because it " +
+s"would put us below the minimum limit of $minNumExecutors 
executors" +
+s"or number of target executors $numExecutorsTarget")
 }
 
-// Send a request to the backend to kill this executor(s)
-val executorsRemoved = if (testing) {
-  executorIdsToBeRemoved
+if (executorIdsToBeRemoved.isEmpty) {
+  Seq.empty[String]
+} else if (testing) {
+  recordExecutorKill(executorIdsToBeRemoved)
+} else if (recoverCachedData) {
+  logDebug(s"Starting replicate process for $executorIdsToBeRemoved")
+  client.markPendingToRemove(executorIdsToBeRemoved)
+  recordExecutorKill(executorIdsToBeRemoved)
+  cacheRecoveryManager.startExecutorKill(executorIdsToBeRemoved)
 } else {
-  client.killExecutors(executorIdsToBeRemoved)
+  val killed = killExecutors(executorIdsToBeRemoved)
+  recordExecutorKill(killed)
 }
-// [SPARK-21834] killExecutors api reduces the target number of 
executors.
-// So we need to update the target with desired value.
-client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
-// reset the newExecutorTotal to the existing number of executors
-newExecutorTotal = numExistingExecutors
-if (testing || executorsRemoved.nonEmpty) {
-  executorsRemoved.foreach { removedExecutorId =>
-newExecutorTotal -= 1
-logInfo(s"Removing executor $removedExecutorId because it has been 
idle for " +
-  s"$executorIdleTimeoutS seconds (new desired total will be 
$newExecutorTotal)")
-executorsPendingToRemove.add(removedExecutorId)
-  }
-  executorsRemoved
-} else {
+  }
+
+  def killExecutors(executorIds: Seq[String], forceIfPending: Boolean = 
false): Seq[String] = {
+logDebug(s"Starting kill process for $executorIds")
+val result = client.killExecutors(executorIds, forceIfPending = 
forceIfPending)
+if (result.isEmpty) {
   logWarning(s"Unable to reach the cluster manager to kill executor/s 
" +
-s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible 
to kill!")
-  Seq.empty[String]
+s"${executorIds.mkString(",")} or no executor eligible to kill!")
 }
+result
   }
 
-  /**
-   * Request the cluster manager to remove the given executor.
-   * Return whether the request is acknowledged.
-   */
-  private def removeExecutor(executorId: String): Boolean = synchronized {
-val executorsRemoved = removeExecutors(Seq(executorId))
-executorsRemoved.nonEmpty && executorsRemoved(0) == 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143810176
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143813871
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143814023
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143818025
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -698,6 +696,11 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   None
 }
   }
+
+  override def markPendingToRemove(executorIds: Seq[String]): Unit = 
synchronized {
+logDebug(s"marking $executorIds pending to remove")
+executorIds.foreach(id => executorsPendingToRemove.put(id, true))
--- End diff --

`.foreach { id => ... }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143817365
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -646,18 +648,14 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   Future.successful(true)
 }
 
-  val killExecutors: Boolean => Future[Boolean] =
-if (!executorsToKill.isEmpty) {
-  _ => doKillExecutors(executorsToKill)
-} else {
-  _ => Future.successful(false)
-}
-
-  val killResponse = 
adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+  val killResponse = if (executorsToKill.nonEmpty) {
+adjustTotalExecutors.flatMap(_ => 
doKillExecutors(executorsToKill))(ThreadUtils.sameThread)
--- End diff --

`.flatMap { _ => ... }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143817162
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -601,40 +602,41 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
*
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones, 
default false
-   * @param force whether to force kill busy executors, default false
+   * @param forceIfBusy whether to force kill busy executors, default false
+   * @param forceIfPending whether to force kill executors that are 
pending to die, default false
* @return the ids of the executors acknowledged by the cluster manager 
to be removed.
*/
   final override def killExecutors(
   executorIds: Seq[String],
   replace: Boolean,
-  force: Boolean): Seq[String] = {
+  forceIfBusy: Boolean,
+  forceIfPending: Boolean): Seq[String] = {
 logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", 
")}")
 
-val response = synchronized {
+val response: Future[Seq[String]] = synchronized {
   val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
-  unknownExecutors.foreach { id =>
-logWarning(s"Executor to kill $id does not exist!")
-  }
+  unknownExecutors.foreach(id => logWarning(s"Executor to kill $id 
does not exist!"))
 
   // If an executor is already pending to be removed, do not kill it 
again (SPARK-9795)
   // If this executor is busy, do not kill it unless we are told to 
force kill it (SPARK-9552)
   val executorsToKill = knownExecutors
-.filter { id => !executorsPendingToRemove.contains(id) }
-.filter { id => force || !scheduler.isExecutorBusy(id) }
-  executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
+.filter { id => forceIfPending || 
!executorsPendingToRemove.contains(id) }
+.filter { id => forceIfBusy || !scheduler.isExecutorBusy(id) }
+  executorsToKill.foreach(id => executorsPendingToRemove(id) = 
!replace)
--- End diff --

Previous style was correct.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143816092
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
--- End diff --

Probably better called `startCacheRecovery`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143819863
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -237,6 +246,43 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def replicateOneBlock(
+  execId: String,
+  blockId: BlockId,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"replicating block $blockId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  replicaSet <- blockLocations.asScala.get(blockId)
+  replicas = replicaSet.toSeq
+  maxReps = replicaSet.size + 2
+} yield info.slaveEndpoint.ask[Boolean](ReplicateBlock(blockId, 
replicas, excluded, maxReps))
+
+response.getOrElse(Future.successful(false)).foreach(context.reply)
+  }
+
+  private def getCachedBlocks(executorId: String): collection.Set[BlockId] 
= {
--- End diff --

just `Set[BlockId]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143802830
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143803799
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
+val (response, blockId) = state.replicateFirstBlock(execId)
+response.onComplete {
+  case Success(true) =>
+logTrace(s"Finished replicating block 
${blockId.getOrElse("unknown")} on exec $execId.")
+replicateBlocks(execId)
+  case Success(false) =>
+checkForReplicableBlocks(Seq(execId))
+  case Failure(f) =>
+logWarning(s"Error trying to replicate block 
${blockId.getOrElse("unknown")}.", f)
+state.killExecutor(execId)
+}
+  }
+}
+
+private object CacheRecoveryManager {
+  def apply(eam: ExecutorAllocationManager, conf: SparkConf): 
CacheRecoveryManager = {
+val bmme = SparkEnv.get.blockManager.master.driverEndpoint
+val state = new CacheRecoveryManagerState(bmme, eam, conf)
+new CacheRecoveryManager(state, conf)
+  }
+}
+
+/**
+ * Private class that holds state for all the executors being shutdown.
+ *
+ * @param blockManagerMasterEndpoint blockManagerMasterEndpoint
+ * @param executorAllocationManager ExecutorAllocationManager
+ * @param conf spark conf
+ */
+final private class 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143801381
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startExecutorKill(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+checkForReplicableBlocks(execIds)
+  }
+
+  /**
+   * Stops all thread pools
+   *
+   * @return
+   */
+  def stop(): java.util.List[Runnable] = {
+threadPool.shutdownNow()
+state.stop()
+  }
+
+  /**
+   * Get list of cached blocks from BlockManagerMaster. If there are 
cached blocks, replicate them,
+   * otherwise kill the executors
+   *
+   * @param execIds the executors to check
+   */
+  private def checkForReplicableBlocks(execIds: Seq[String]) = 
state.getBlocks(execIds).foreach {
+case (executorId, HasCachedBlocks) => replicateBlocks(executorId)
+case (executorId, NoMoreBlocks | NotEnoughMemory) => 
state.killExecutor(executorId)
+  }
+
+  /**
+   * Replicate one cached block on an executor. If there are more, repeat. 
If there are none, check
+   * with the block manager master again. If there is an error, go ahead 
and kill executor.
+   *
+   * @param execId the executor to save a block one
+   */
+  private def replicateBlocks(execId: String): Unit = {
+import scala.util.Success
--- End diff --

Why isn't this imported at the top? (And you know scala allows you to 
rename types on import, in case there's a conflict, right?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143799387
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, 
GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executors cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+state: CacheRecoveryManagerState,
+conf: SparkConf)
+  extends Logging {
+
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
--- End diff --

Same thing as before with the weird name. "cache-recovery-manager-pool" is 
a much better name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143816714
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -76,6 +76,14 @@ package object config {
 .timeConf(TimeUnit.MILLISECONDS)
 .createWithDefaultString("3s")
 
+  private[spark] val DYN_ALLOCATION_RECOVER_CACHE =
--- End diff --

as with other suggestion, `CACHE_RECOVERY`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r143815711
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -416,63 +423,52 @@ private[spark] class ExecutorAllocationManager(
* Request the cluster manager to remove the given executors.
* Returns the list of executors which are removed.
*/
-  private def removeExecutors(executors: Seq[String]): Seq[String] = 
synchronized {
-val executorIdsToBeRemoved = new ArrayBuffer[String]
-
+  private def removeExecutors(executors: Seq[String]): Unit = synchronized 
{
 logInfo("Request to remove executorIds: " + executors.mkString(", "))
-val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
-
-var newExecutorTotal = numExistingExecutors
-executors.foreach { executorIdToBeRemoved =>
-  if (newExecutorTotal - 1 < minNumExecutors) {
-logDebug(s"Not removing idle executor $executorIdToBeRemoved 
because there are only " +
-  s"$newExecutorTotal executor(s) left (minimum number of executor 
limit $minNumExecutors)")
-  } else if (newExecutorTotal - 1 < numExecutorsTarget) {
-logDebug(s"Not removing idle executor $executorIdToBeRemoved 
because there are only " +
-  s"$newExecutorTotal executor(s) left (number of executor target 
$numExecutorsTarget)")
-  } else if (canBeKilled(executorIdToBeRemoved)) {
-executorIdsToBeRemoved += executorIdToBeRemoved
-newExecutorTotal -= 1
-  }
-}
+val numExistingExecs = allocationManager.executorIds.size - 
executorsPendingToRemove.size
+val execCountFloor = math.max(minNumExecutors, numExecutorsTarget)
+val (executorIdsToBeRemoved, dontRemove) = executors
+  .filter(canBeKilled)
+  .splitAt(numExistingExecs - execCountFloor)
 
-if (executorIdsToBeRemoved.isEmpty) {
-  return Seq.empty[String]
+dontRemove.foreach { execId =>
--- End diff --

minor: add `log.isDebugEnabled` check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   >