[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18966 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82598/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18966 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18966 **[Test build #82598 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82598/testReport)** for PR 18966 at commit [`516a72a`](https://github.com/apache/spark/commit/516a72a62cb579f2952c4b776afec0dc1826e590). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user MrBago commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r143853274 --- Diff: python/pyspark/ml/image.py --- @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pyspark +from pyspark import SparkContext +from pyspark.sql.types import * +from pyspark.sql.types import Row, _create_row +from pyspark.sql import DataFrame +from pyspark.ml.param.shared import * +import numpy as np + +undefinedImageType = "Undefined" + +ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"] + +ocvTypes = { +undefinedImageType: -1, +"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24, +"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25, +"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, "CV_16UC4": 26, +"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, "CV_16SC4": 27, +"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, "CV_32SC4": 28, +"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, "CV_32FC4": 29, +"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, "CV_64FC4": 30 +} + +ImageSchema = StructType([ +StructField(ImageFields[0], StringType(), True), +StructField(ImageFields[1], IntegerType(), False), +StructField(ImageFields[2], IntegerType(), False), +StructField(ImageFields[3], IntegerType(), False), +# OpenCV-compatible type: CV_8UC3 in most cases +StructField(ImageFields[4], StringType(), False), +# bytes in OpenCV-compatible order: row-wise BGR in most cases +StructField(ImageFields[5], BinaryType(), False)]) + + +# TODO: generalize to other datatypes and number of channels +def toNDArray(image): +""" +Converts an image to a 1-dimensional array + +Args: +image (object): The image to be converted + +Returns: +array: The image as a 1-dimensional array + +.. versionadded:: 2.3.0 +""" +height = image.height +width = image.width +return np.asarray(image.data, dtype=np.uint8) \ + .reshape((height, width, 3))[:, :, (2, 1, 0)] + + +# TODO: generalize to other datatypes and number of channels +def toImage(array, origin="", mode="CV_8UC3"): +""" + +Converts a one-dimensional array to a 2 dimensional image + +Args: +array (array): +origin (str): +mode (int): + +Returns: +object: 2 dimensional image + +.. versionadded:: 2.3.0 +""" +length = np.prod(array.shape) + +data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)] + .reshape(length)) +height = array.shape[0] +width = array.shape[1] +nChannels = array.shape[2] +# Creating new Row with _create_row(), because Row(name = value, ... ) --- End diff -- @holdenk I believe the ordered by name schema works in general, there is a serialization bug that I'm aware of, I filed it here, https://issues.apache.org/jira/browse/SPARK-22232 but I hope we can fix that for 2.3 (I can help with that). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #6751: [SPARK-8300] DataFrame hint for broadcast join.
Github user sridharsubramanian62 commented on the issue: https://github.com/apache/spark/pull/6751 Its available from spark 2.2.0. On Tuesday, October 10, 2017, 1:46:27 PM PDT, Reynold Xinwrote: Isn't the hint available in SQL? â You are receiving this because you commented. Reply to this email directly, view it on GitHub, or mute the thread. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143850792 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -148,18 +149,30 @@ private[kinesis] class KinesisReceiver[T]( kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) val kinesisProvider = kinesisCreds.provider -val kinesisClientLibConfiguration = new KinesisClientLibConfiguration( - checkpointAppName, - streamName, - kinesisProvider, - dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), - cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), - workerId) + +val kinesisClientLibConfiguration = { + val baseClientLibConfiguration = new KinesisClientLibConfiguration( +checkpointAppName, +streamName, +kinesisProvider, +dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), +cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), +workerId) .withKinesisEndpoint(endpointUrl) -.withInitialPositionInStream(initialPositionInStream) + .withInitialPositionInStream(initialPosition.initialPositionInStream) .withTaskBackoffTimeMillis(500) .withRegionName(regionName) + // Update the Kinesis client lib config with timestamp + // if InitialPositionInStream.AT_TIMESTAMP is passed + initialPosition match { +case atTimestamp: AtTimestamp => --- End diff -- nit: ```scala initialPosition match { case AtTimestamp(ts) => baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts) ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143850553 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -182,14 +181,14 @@ object KinesisInputDStream { /** * Sets the initial position data is read from in the Kinesis stream. Defaults to - * [[InitialPositionInStream.LATEST]] if no custom value is specified. + * [[InitialPosition.latest]] if no custom value is specified. * - * @param initialPosition InitialPositionInStream value specifying where Spark Streaming + * @param initialPosition [[InitialPosition]] value specifying where Spark Streaming *will start reading records in the Kinesis stream from * @return Reference to this [[KinesisInputDStream.Builder]] */ -def initialPositionInStream(initialPosition: InitialPositionInStream): Builder = { --- End diff -- don't remove this API, since it will break compatibility. Instead add an API to take the `withTimestamp`. In the end if we see that `withTimestamp` has been set, but initial position isn't `AtTimestamp`, then we throw an error. Likewise if `AtTimestamp` is set, but no timestamp has been provided, also throw an error. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143849596 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import java.util.Date; + +/** + * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition + * to expose the corresponding scala objects for InitialPositionInStream. + * The functions are intentionally Upper cased to appear like classes for + * usage in Java classes. + */ +public class KinesisInitialPosition { + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.LATEST. + * @return + */ +public static InitialPosition Latest() { +return InitialPosition$.MODULE$.latest(); --- End diff -- Can you instead return `Latest$.MODULE$` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143850857 --- Diff: external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java --- @@ -45,15 +43,15 @@ public void testJavaKinesisDStreamBuilder() { .streamName(streamName) .endpointUrl(endpointUrl) .regionName(region) - .initialPositionInStream(initialPosition) --- End diff -- ditto on API change --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143849691 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import java.util.Date; + +/** + * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition + * to expose the corresponding scala objects for InitialPositionInStream. + * The functions are intentionally Upper cased to appear like classes for + * usage in Java classes. + */ +public class KinesisInitialPosition { + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.LATEST. + * @return + */ +public static InitialPosition Latest() { +return InitialPosition$.MODULE$.latest(); +} + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.TRIM_HORIZON. + * @return + */ +public static InitialPosition TrimHorizon() { +return InitialPosition$.MODULE$.trimHorizon(); +} + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.AT_TIMESTAMP. + * @param timestamp + * @return + */ +public static InitialPosition AtTimestamp(Date timestamp) { +return InitialPosition$.MODULE$.atTimestamp(timestamp); --- End diff -- `AtTimestamp.apply(timestamp)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143849634 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import java.util.Date; + +/** + * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition + * to expose the corresponding scala objects for InitialPositionInStream. + * The functions are intentionally Upper cased to appear like classes for + * usage in Java classes. + */ +public class KinesisInitialPosition { + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.LATEST. + * @return + */ +public static InitialPosition Latest() { +return InitialPosition$.MODULE$.latest(); +} + +/** + * Returns instance of AtTimestamp with InitialPositionInStream.TRIM_HORIZON. + * @return + */ +public static InitialPosition TrimHorizon() { +return InitialPosition$.MODULE$.trimHorizon(); --- End diff -- Can you instead return `TrimHorizon$.MODULE$` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143849442 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest: InitialPosition = Latest + + /** + * An instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + val trimHorizon: InitialPosition = TrimHorizon --- End diff -- We don't need both the Scala API for this anymore. We can just use the Java objects directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r143850590 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -182,14 +181,14 @@ object KinesisInputDStream { /** * Sets the initial position data is read from in the Kinesis stream. Defaults to - * [[InitialPositionInStream.LATEST]] if no custom value is specified. + * [[InitialPosition.latest]] if no custom value is specified. --- End diff -- don't change docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19250 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19250 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82595/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19181 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19250 **[Test build #82595 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82595/testReport)** for PR 19250 at commit [`5607160`](https://github.com/apache/spark/commit/5607160afaf0f5ecd93fa59b97549bec937991b4). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/19181 Merging to master. Thanks! Can you create a backport for Spark-2.2? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18966 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18966 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82596/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18966: [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18966 **[Test build #82596 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82596/testReport)** for PR 18966 at commit [`4c47802`](https://github.com/apache/spark/commit/4c4780207afcc2d55d19bf8d3e9fc29812f07ae8). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19181 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82594/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19181 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19181 **[Test build #82594 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82594/testReport)** for PR 19181 at commit [`6b901ee`](https://github.com/apache/spark/commit/6b901eea4dd7aace3e6a47b333c0dfa815b031db). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #6751: [SPARK-8300] DataFrame hint for broadcast join.
Github user rxin commented on the issue: https://github.com/apache/spark/pull/6751 Isn't the hint available in SQL? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/18664 I'll work on doing (1) to have conversions in Python for Arrow to match Non-Arrow and we can see how that turns out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/18664 > I'm just wondering what if we use timestamp in nested types. Currently we don't support nested types but in the future? I'll try to take this into account, or at least add a note for timestamps in nested types > BTW, could we just use DateTimeUtils.defaultTimeZone() instead of SQLConf.SESSION_LOCAL_TIMEZONE if you really meant it? I think the previous behaviour without Arrow does not count SQLConf.SESSION_LOCAL_TIMEZONE? @ueshin do you agree to just use `DateTimeUtils.defaultTimeZone()`? Most places where Arrow is used this would be the case. The only scenario I can think of that would not use it is if the user generated timestamps with SparkSQL and then called `toPandas()` or ran a `pandas_udf`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/18664 > BTW, do you think it is possible to easily de-duplicate timezone handling for both with-Arrow and without-Arrow within Python side if we go for 1. in the separate PR? @HyukjinKwon , are you referring to how we might handle timestamps with timezones in a future PR? I think doing this for non-arrow would be a bit more complicated than just the Arrow cases, and I'm not sure the best way to handle it unless we just make a new TimestampWithTimezone data type.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17100: [SPARK-13947][SQL] PySpark DataFrames: The error message...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/17100 Will review it this weekend. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19466: [SPARK-22237] [CORE] Fix spark submit file download for ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19466 **[Test build #82604 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82604/testReport)** for PR 19466 at commit [`807a767`](https://github.com/apache/spark/commit/807a767f8e1f90789299e83c995d9e6e50015a96). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19466: [CORE] Fix spark submit file download for standalone cli...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19466 cc @yhuai @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19466: [CORE] Fix spark submit file download for standalone cli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19466 **[Test build #82603 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82603/testReport)** for PR 19466 at commit [`86e62f6`](https://github.com/apache/spark/commit/86e62f6906bc19fb4f3b91523ab112d4315f2105). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15670: [SPARK-18161] [Python] Allow pickle to serialize >4 GB o...
Github user holdenk commented on the issue: https://github.com/apache/spark/pull/15670 Do we plan to move this forward? Do you have the time to update this @singularperturbation ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15670: [SPARK-18161] [Python] Allow pickle to serialize ...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/15670#discussion_r143838765 --- Diff: python/pyspark/serializers.py --- @@ -64,7 +64,7 @@ from itertools import izip as zip else: import pickle -protocol = 3 +protocol = min(pickle.HIGHEST_PROTOCOL, 4) --- End diff -- It sounds like this part of the change would be ok then since Pyrolite can read v4 just fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19466: [CORE] Fix spark submit file download for standal...
GitHub user loneknightpy opened a pull request: https://github.com/apache/spark/pull/19466 [CORE] Fix spark submit file download for standalone client mode ## What changes were proposed in this pull request? This PR makes spark-submit script to use downloaded files in local/standalone client mode. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/loneknightpy/spark fix-spark-submit-for-standalone Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19466.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19466 commit 1bde00c0d5c90cebdd2a8514078ace8cf33c42f8 Author: Yu PengDate: 2017-10-10T20:06:37Z Fix spark submit file download for standalone client mode --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15666: [SPARK-11421] [Core][Python][R] Added ability for...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/15666#discussion_r143838425 --- Diff: python/pyspark/tests.py --- @@ -435,6 +436,19 @@ def test_add_file_locally(self): with open(download_path) as test_file: self.assertEqual("Hello World!\n", test_file.readline()) +def test_add_jar(self): +jvm = self.sc._jvm +# We shouldn't be able to load anything from the package before it is added +self.assertFalse(isinstance(jvm.pysparktests.DummyClass, JavaClass)) +# Generate and compile the test jar +destDir = os.path.join(SPARK_HOME, "python/test_support/jar") --- End diff -- Instead you want to use a temp directory? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18457: [SPARK-21241][MLlib]- Add setIntercept to StreamingLinea...
Github user holdenk commented on the issue: https://github.com/apache/spark/pull/18457 In the meantime Jenkins OK to test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17100: [SPARK-13947][SQL] PySpark DataFrames: The error message...
Github user holdenk commented on the issue: https://github.com/apache/spark/pull/17100 re-ping @gatorsmile? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18457: [SPARK-21241][MLlib]- Add setIntercept to StreamingLinea...
Github user holdenk commented on the issue: https://github.com/apache/spark/pull/18457 So we're only really doing bug fixes on the old MLlib stuff, but I guess we haven't finalized the new ML streaming stuff so I'm not sure what we want to do here. cc @MLnick . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r143836094 --- Diff: python/pyspark/ml/image.py --- @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pyspark +from pyspark import SparkContext +from pyspark.sql.types import * +from pyspark.sql.types import Row, _create_row +from pyspark.sql import DataFrame +from pyspark.ml.param.shared import * +import numpy as np + +undefinedImageType = "Undefined" + +ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"] + +ocvTypes = { +undefinedImageType: -1, +"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24, +"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25, +"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, "CV_16UC4": 26, +"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, "CV_16SC4": 27, +"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, "CV_32SC4": 28, +"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, "CV_32FC4": 29, +"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, "CV_64FC4": 30 +} + +ImageSchema = StructType([ +StructField(ImageFields[0], StringType(), True), +StructField(ImageFields[1], IntegerType(), False), +StructField(ImageFields[2], IntegerType(), False), +StructField(ImageFields[3], IntegerType(), False), +# OpenCV-compatible type: CV_8UC3 in most cases +StructField(ImageFields[4], StringType(), False), +# bytes in OpenCV-compatible order: row-wise BGR in most cases +StructField(ImageFields[5], BinaryType(), False)]) + + +# TODO: generalize to other datatypes and number of channels +def toNDArray(image): +""" +Converts an image to a 1-dimensional array + +Args: +image (object): The image to be converted + +Returns: +array: The image as a 1-dimensional array + +.. versionadded:: 2.3.0 +""" +height = image.height +width = image.width +return np.asarray(image.data, dtype=np.uint8) \ + .reshape((height, width, 3))[:, :, (2, 1, 0)] + + +# TODO: generalize to other datatypes and number of channels +def toImage(array, origin="", mode="CV_8UC3"): +""" + +Converts a one-dimensional array to a 2 dimensional image + +Args: +array (array): +origin (str): +mode (int): + +Returns: +object: 2 dimensional image + +.. versionadded:: 2.3.0 +""" +length = np.prod(array.shape) + +data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)] + .reshape(length)) +height = array.shape[0] +width = array.shape[1] +nChannels = array.shape[2] +# Creating new Row with _create_row(), because Row(name = value, ... ) +# orders fields by name, which conflicts with expected ImageSchema order +# when the new DataFrame is created by UDF +return _create_row(ImageFields, + [origin, height, width, nChannels, mode, data]) + + +def readImages(path, + recursive=False, + numPartitions=0, + dropImageFailures=False, + sampleRatio=1.0): +""" +Reads the directory of images from the local or remote (WASB) source. +Args: --- End diff -- These arguments don't match the function arguments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r143836725 --- Diff: python/pyspark/ml/image.py --- @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pyspark +from pyspark import SparkContext +from pyspark.sql.types import * +from pyspark.sql.types import Row, _create_row +from pyspark.sql import DataFrame +from pyspark.ml.param.shared import * +import numpy as np + +undefinedImageType = "Undefined" + +ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"] + +ocvTypes = { +undefinedImageType: -1, +"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24, +"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25, +"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, "CV_16UC4": 26, +"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, "CV_16SC4": 27, +"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, "CV_32SC4": 28, +"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, "CV_32FC4": 29, +"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, "CV_64FC4": 30 +} + +ImageSchema = StructType([ +StructField(ImageFields[0], StringType(), True), +StructField(ImageFields[1], IntegerType(), False), +StructField(ImageFields[2], IntegerType(), False), +StructField(ImageFields[3], IntegerType(), False), +# OpenCV-compatible type: CV_8UC3 in most cases +StructField(ImageFields[4], StringType(), False), +# bytes in OpenCV-compatible order: row-wise BGR in most cases +StructField(ImageFields[5], BinaryType(), False)]) + + +# TODO: generalize to other datatypes and number of channels +def toNDArray(image): +""" +Converts an image to a 1-dimensional array + +Args: +image (object): The image to be converted + +Returns: +array: The image as a 1-dimensional array + +.. versionadded:: 2.3.0 +""" +height = image.height +width = image.width +return np.asarray(image.data, dtype=np.uint8) \ + .reshape((height, width, 3))[:, :, (2, 1, 0)] + + +# TODO: generalize to other datatypes and number of channels +def toImage(array, origin="", mode="CV_8UC3"): +""" + +Converts a one-dimensional array to a 2 dimensional image + +Args: +array (array): +origin (str): +mode (int): --- End diff -- The argument description here is incorrect and not very useful (or the default parameters for the function are incorrect). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/19439#discussion_r143836993 --- Diff: python/pyspark/ml/image.py --- @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pyspark +from pyspark import SparkContext +from pyspark.sql.types import * +from pyspark.sql.types import Row, _create_row +from pyspark.sql import DataFrame +from pyspark.ml.param.shared import * +import numpy as np + +undefinedImageType = "Undefined" + +ImageFields = ["origin", "height", "width", "nChannels", "mode", "data"] + +ocvTypes = { +undefinedImageType: -1, +"CV_8U": 0, "CV_8UC1": 0, "CV_8UC2": 8, "CV_8UC3": 16, "CV_8UC4": 24, +"CV_8S": 1, "CV_8SC1": 1, "CV_8SC2": 9, "CV_8SC3": 17, "CV_8SC4": 25, +"CV_16U": 2, "CV_16UC1": 2, "CV_16UC2": 10, "CV_16UC3": 18, "CV_16UC4": 26, +"CV_16S": 3, "CV_16SC1": 3, "CV_16SC2": 11, "CV_16SC3": 19, "CV_16SC4": 27, +"CV_32S": 4, "CV_32SC1": 4, "CV_32SC2": 12, "CV_32SC3": 20, "CV_32SC4": 28, +"CV_32F": 5, "CV_32FC1": 5, "CV_32FC2": 13, "CV_32FC3": 21, "CV_32FC4": 29, +"CV_64F": 6, "CV_64FC1": 6, "CV_64FC2": 14, "CV_64FC3": 22, "CV_64FC4": 30 +} + +ImageSchema = StructType([ +StructField(ImageFields[0], StringType(), True), +StructField(ImageFields[1], IntegerType(), False), +StructField(ImageFields[2], IntegerType(), False), +StructField(ImageFields[3], IntegerType(), False), +# OpenCV-compatible type: CV_8UC3 in most cases +StructField(ImageFields[4], StringType(), False), +# bytes in OpenCV-compatible order: row-wise BGR in most cases +StructField(ImageFields[5], BinaryType(), False)]) + + +# TODO: generalize to other datatypes and number of channels +def toNDArray(image): +""" +Converts an image to a 1-dimensional array + +Args: +image (object): The image to be converted + +Returns: +array: The image as a 1-dimensional array + +.. versionadded:: 2.3.0 +""" +height = image.height +width = image.width +return np.asarray(image.data, dtype=np.uint8) \ + .reshape((height, width, 3))[:, :, (2, 1, 0)] + + +# TODO: generalize to other datatypes and number of channels +def toImage(array, origin="", mode="CV_8UC3"): +""" + +Converts a one-dimensional array to a 2 dimensional image + +Args: +array (array): +origin (str): +mode (int): + +Returns: +object: 2 dimensional image + +.. versionadded:: 2.3.0 +""" +length = np.prod(array.shape) + +data = bytearray(array.astype(dtype=np.int8)[:, :, (2, 1, 0)] + .reshape(length)) +height = array.shape[0] +width = array.shape[1] +nChannels = array.shape[2] +# Creating new Row with _create_row(), because Row(name = value, ... ) --- End diff -- Would it maybe make sense to support the ordered by name schema? Other folks might make their own loaders and this seems like a potential point of confusion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...
Github user a10y commented on a diff in the pull request: https://github.com/apache/spark/pull/19082#discussion_r143836700 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -244,6 +246,89 @@ case class HashAggregateExec( protected override val shouldStopRequired = false + // Extracts all the input variable references for a given `aggExpr`. This result will be used + // to split aggregation into small functions. + private def getInputVariableReferences( + ctx: CodegenContext, + aggExpr: Expression, + subExprs: Map[Expression, SubExprEliminationState]): Set[(String, String)] = { +// `argSet` collects all the pairs of variable names and their types, the first in the pair is +// a type name and the second is a variable name. +val argSet = mutable.Set[(String, String)]() +val stack = mutable.Stack[Expression](aggExpr) +while (stack.nonEmpty) { + stack.pop() match { +case e if subExprs.contains(e) => + val exprCode = subExprs(e) + if (CodegenContext.isJavaIdentifier(exprCode.value)) { +argSet += ((ctx.javaType(e.dataType), exprCode.value)) + } + if (CodegenContext.isJavaIdentifier(exprCode.isNull)) { +argSet += (("boolean", exprCode.isNull)) + } + // Since the children possibly has common expressions, we push them here + stack.pushAll(e.children) +case ref: BoundReference +if ctx.currentVars != null && ctx.currentVars(ref.ordinal) != null => + val value = ctx.currentVars(ref.ordinal).value + val isNull = ctx.currentVars(ref.ordinal).isNull + if (CodegenContext.isJavaIdentifier(value)) { +argSet += ((ctx.javaType(ref.dataType), value)) + } + if (CodegenContext.isJavaIdentifier(isNull)) { +argSet += (("boolean", isNull)) + } +case _: BoundReference => + argSet += (("InternalRow", ctx.INPUT_ROW)) +case e => + stack.pushAll(e.children) + } +} + +argSet.toSet + } + + // Splits the aggregation into small functions because the HotSpot does not compile + // too long functions. + private def splitAggregateExpressions( + ctx: CodegenContext, + aggExprs: Seq[Expression], + evalAndUpdateCodes: Seq[String], + subExprs: Map[Expression, SubExprEliminationState], + otherArgs: Seq[(String, String)] = Seq.empty): Seq[String] = { +aggExprs.zipWithIndex.map { case (aggExpr, i) => + // The maximum number of parameters in non-static Java methods is 254, so this method gives + // up splitting the code if the number goes over the limit. + // You can find more information about the limit in the JVM specification: + // - The number of method parameters is limited to 255 by the definition of a method --- End diff -- Unless you wanna check the types of all the parameters you might be better off halving this to 127 parameters for the worst case. Though I'm not sure how many codegens this affects... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...
Github user a10y commented on a diff in the pull request: https://github.com/apache/spark/pull/19082#discussion_r143836110 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -244,6 +246,89 @@ case class HashAggregateExec( protected override val shouldStopRequired = false + // Extracts all the input variable references for a given `aggExpr`. This result will be used + // to split aggregation into small functions. + private def getInputVariableReferences( + ctx: CodegenContext, + aggExpr: Expression, + subExprs: Map[Expression, SubExprEliminationState]): Set[(String, String)] = { +// `argSet` collects all the pairs of variable names and their types, the first in the pair is +// a type name and the second is a variable name. +val argSet = mutable.Set[(String, String)]() +val stack = mutable.Stack[Expression](aggExpr) +while (stack.nonEmpty) { + stack.pop() match { +case e if subExprs.contains(e) => + val exprCode = subExprs(e) + if (CodegenContext.isJavaIdentifier(exprCode.value)) { +argSet += ((ctx.javaType(e.dataType), exprCode.value)) + } + if (CodegenContext.isJavaIdentifier(exprCode.isNull)) { +argSet += (("boolean", exprCode.isNull)) + } + // Since the children possibly has common expressions, we push them here + stack.pushAll(e.children) +case ref: BoundReference +if ctx.currentVars != null && ctx.currentVars(ref.ordinal) != null => + val value = ctx.currentVars(ref.ordinal).value + val isNull = ctx.currentVars(ref.ordinal).isNull + if (CodegenContext.isJavaIdentifier(value)) { +argSet += ((ctx.javaType(ref.dataType), value)) + } + if (CodegenContext.isJavaIdentifier(isNull)) { +argSet += (("boolean", isNull)) + } +case _: BoundReference => + argSet += (("InternalRow", ctx.INPUT_ROW)) +case e => + stack.pushAll(e.children) + } +} + +argSet.toSet + } + + // Splits the aggregation into small functions because the HotSpot does not compile + // too long functions. + private def splitAggregateExpressions( + ctx: CodegenContext, + aggExprs: Seq[Expression], + evalAndUpdateCodes: Seq[String], + subExprs: Map[Expression, SubExprEliminationState], + otherArgs: Seq[(String, String)] = Seq.empty): Seq[String] = { +aggExprs.zipWithIndex.map { case (aggExpr, i) => + // The maximum number of parameters in non-static Java methods is 254, so this method gives + // up splitting the code if the number goes over the limit. + // You can find more information about the limit in the JVM specification: + // - The number of method parameters is limited to 255 by the definition of a method --- End diff -- If you read the spec closely, it actually says that there are 255 slots, where 1 slot is taken by **this** and 2 slots each are taken up by **long** and **double** parameters. https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-4.html#jvms-4.3.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19082: [SPARK-21870][SQL] Split aggregation code into small fun...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19082 @kiszk This is one of the solutions if we can remove the limit of num of parameters. However, this does not resolve all the issues. For example, the method becomes too big to inline. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19438: [SPARK-22208] [SQL] Improve percentile_approx by not rou...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19438 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19438: [SPARK-22208] [SQL] Improve percentile_approx by not rou...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19438 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82593/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19272 **[Test build #82602 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82602/testReport)** for PR 19272 at commit [`f5925fd`](https://github.com/apache/spark/commit/f5925fd2ab31c1d6825fa546102172ed8a55ae8b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19438: [SPARK-22208] [SQL] Improve percentile_approx by not rou...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19438 **[Test build #82593 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82593/testReport)** for PR 19438 at commit [`1180265`](https://github.com/apache/spark/commit/11802650c938be59163721254193c67bf3949a99). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19272 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19272 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19250 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82592/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19250 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19250 **[Test build #82592 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82592/testReport)** for PR 19250 at commit [`5c03e07`](https://github.com/apache/spark/commit/5c03e07ca0c3ec95b580280b7efe7aa7e1b5d734). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class AdjustTimestamps(conf: SQLConf) extends Rule[LogicalPlan] ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18460: [SPARK-21247][SQL] Type comparison should respect case-s...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/18460 Hi, @gatorsmile . Could you review this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/19337#discussion_r143818776 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala --- @@ -224,6 +224,24 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM /** * For Online optimizer only: [[optimizer]] = "online". * + * A (positive) learning parameter that controls the convergence of variational inference. + * Smaller value will lead to more accuracy model and longer training time. --- End diff -- Sorry to be troublesome here. I'm not sure smaller value for the convergence tolerance will always lead to a more accurate model. Smaller value will make the statistics for each batch more fitting to the training data (even over fitting), I'm not sure about the overall model accuracy will be better, especially for data not in the training dataset. Maybe just "Smaller value will lead to a more converged model and longer training time"? Please also add doc for default value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19082: [SPARK-21870][SQL] Split aggregation code into small fun...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19082 @gatorsmile let me clarify. In other words, do you want to find how to cut the boundary to ensure enabling method inlining for callee methods? It could maximize advantage of both whole-stage codegen and JIT optimization in a compilation unit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19465: [SPARK-21988][SS]Implement StreamingRelation.computeStat...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19465 **[Test build #82600 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82600/testReport)** for PR 19465 at commit [`96f2c30`](https://github.com/apache/spark/commit/96f2c300de3a3e74e7f2c1794817aff8f4a18f58). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19459 **[Test build #82601 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82601/testReport)** for PR 19459 at commit [`c7ddee6`](https://github.com/apache/spark/commit/c7ddee6b7ab91c1651a397a716ed91ed2a8383a3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19309: [SPARK-19558][sql] Add config key to register QueryExecu...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19309 That's basically what the PR summary says. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19459#discussion_r143821657 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -203,4 +205,16 @@ private[sql] object ArrowConverters { reader.close() } } + + def toDataFrame( --- End diff -- I left the conversion logic in `ArrowConverters` because I think there is a good chance it will change, so just added a wrapper to `PythonSQLUtils` let me know if it's ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19465: [SPARK-21988]Implement StreamingRelation.computeS...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/19465 [SPARK-21988]Implement StreamingRelation.computeStats to fix explain ## What changes were proposed in this pull request? Implement StreamingRelation.computeStats to fix explain ## How was this patch tested? - unit tests: `StreamingRelation.computeStats` and `StreamingExecutionRelation.computeStats`. - regression tests: `explain join with a normal source` and `explain join with MemoryStream`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-21988 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19465.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19465 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19465: [SPARK-21988]Implement StreamingRelation.computeStats to...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19465 cc @joseph-torres --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143801968 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") --- End diff -- s/exec/executor. That is if you even want to keep this log around... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143802667 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143817781 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -646,18 +648,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Future.successful(true) } - val killExecutors: Boolean => Future[Boolean] = -if (!executorsToKill.isEmpty) { - _ => doKillExecutors(executorsToKill) -} else { - _ => Future.successful(false) -} - - val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + val killResponse = if (executorsToKill.nonEmpty) { +adjustTotalExecutors.flatMap(_ => doKillExecutors(executorsToKill))(ThreadUtils.sameThread) + } else { +Future.successful(false) + } - killResponse.flatMap(killSuccessful => -Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String]) - )(ThreadUtils.sameThread) + killResponse.map { successful => if (successful) executorsToKill else Seq.empty --- End diff -- ``` killResponse.map { successful => if (successful) executorsToKill else Seq.empty }(ThreadUtils.sameThread) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143800800 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { --- End diff -- `: Unit =` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143813032 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark issue #19082: [SPARK-21870][SQL] Split aggregation code into small fun...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19082 https://github.com/apache/spark/pull/18931 is not what we want, although it can partially resolves some issues. Simply disabling the whole-stage codegen might trigger the regression like Q66. The general goal is to maximize the advantage of both whole-stage codegen and JIT optimization. Now, the point is how to cut the boundaries without introducing the extra IR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143814841 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -88,6 +88,8 @@ private[spark] class ExecutorAllocationManager( import ExecutorAllocationManager._ + var cacheRecoveryManager: CacheRecoveryManager = _ --- End diff -- private? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143802485 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143820628 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -237,6 +246,43 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def replicateOneBlock( + execId: String, + blockId: BlockId, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"replicating block $blockId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + replicaSet <- blockLocations.asScala.get(blockId) + replicas = replicaSet.toSeq + maxReps = replicaSet.size + 2 +} yield info.slaveEndpoint.ask[Boolean](ReplicateBlock(blockId, replicas, excluded, maxReps)) + +response.getOrElse(Future.successful(false)).foreach(context.reply) + } + + private def getCachedBlocks(executorId: String): collection.Set[BlockId] = { +val cachedBlocks = for { + blockManagerId <- blockManagerIdByExecutor.get(executorId) + info <- blockManagerInfo.get(blockManagerId) +} yield info.cachedBlocks + +cachedBlocks.getOrElse(Set.empty) + } + + private def getSizeOfBlocks(blockMap: Map[String, Set[RDDBlockId]]): Map[String, Long] = for { --- End diff -- Put the body in `{ }` - looks kinda odd without it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143812180 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143810812 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143810600 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143800993 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, --- End diff -- The method doesn't return the list, so its short description should more accurately reflect what it actually does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143819718 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -237,6 +246,43 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def replicateOneBlock( + execId: String, + blockId: BlockId, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"replicating block $blockId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + replicaSet <- blockLocations.asScala.get(blockId) + replicas = replicaSet.toSeq + maxReps = replicaSet.size + 2 --- End diff -- Why `+ 2`? Good to add comment explaining. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143813769 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143815041 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -234,12 +239,14 @@ private[spark] class ExecutorAllocationManager( executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) +cacheRecoveryManager = CacheRecoveryManager(this, conf) --- End diff -- Gate on `recoverCachedData`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143816789 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -76,6 +76,14 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("3s") + private[spark] val DYN_ALLOCATION_RECOVER_CACHE = + ConfigBuilder("spark.dynamicAllocation.recoverCachedData").booleanConf.createWithDefault(false) --- End diff -- `.cacheRecovery.enabled` to follow pattern of other feature flags. I also kinda prefer multiple lines in these constant declarations, for readability. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143818189 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -97,6 +97,10 @@ class BlockManagerMasterEndpoint( case GetStorageStatus => context.reply(storageStatus) +case GetCachedBlocks(executorId) => context.reply(getCachedBlocks(executorId)) --- End diff -- Code in next line to follow pattern in other cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143802914 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT --- End diff -- s/RECOVER_CACHE/CACHE_RECOVERY --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143811908 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143807978 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143811393 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143816598 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -416,63 +423,52 @@ private[spark] class ExecutorAllocationManager( * Request the cluster manager to remove the given executors. * Returns the list of executors which are removed. */ - private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized { -val executorIdsToBeRemoved = new ArrayBuffer[String] - + private def removeExecutors(executors: Seq[String]): Unit = synchronized { logInfo("Request to remove executorIds: " + executors.mkString(", ")) -val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size - -var newExecutorTotal = numExistingExecutors -executors.foreach { executorIdToBeRemoved => - if (newExecutorTotal - 1 < minNumExecutors) { -logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + - s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)") - } else if (newExecutorTotal - 1 < numExecutorsTarget) { -logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + - s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)") - } else if (canBeKilled(executorIdToBeRemoved)) { -executorIdsToBeRemoved += executorIdToBeRemoved -newExecutorTotal -= 1 - } -} +val numExistingExecs = allocationManager.executorIds.size - executorsPendingToRemove.size +val execCountFloor = math.max(minNumExecutors, numExecutorsTarget) +val (executorIdsToBeRemoved, dontRemove) = executors + .filter(canBeKilled) + .splitAt(numExistingExecs - execCountFloor) -if (executorIdsToBeRemoved.isEmpty) { - return Seq.empty[String] +dontRemove.foreach { execId => + logDebug(s"Not removing idle executor $execId because it " + +s"would put us below the minimum limit of $minNumExecutors executors" + +s"or number of target executors $numExecutorsTarget") } -// Send a request to the backend to kill this executor(s) -val executorsRemoved = if (testing) { - executorIdsToBeRemoved +if (executorIdsToBeRemoved.isEmpty) { + Seq.empty[String] +} else if (testing) { + recordExecutorKill(executorIdsToBeRemoved) +} else if (recoverCachedData) { + logDebug(s"Starting replicate process for $executorIdsToBeRemoved") + client.markPendingToRemove(executorIdsToBeRemoved) + recordExecutorKill(executorIdsToBeRemoved) + cacheRecoveryManager.startExecutorKill(executorIdsToBeRemoved) } else { - client.killExecutors(executorIdsToBeRemoved) + val killed = killExecutors(executorIdsToBeRemoved) + recordExecutorKill(killed) } -// [SPARK-21834] killExecutors api reduces the target number of executors. -// So we need to update the target with desired value. -client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) -// reset the newExecutorTotal to the existing number of executors -newExecutorTotal = numExistingExecutors -if (testing || executorsRemoved.nonEmpty) { - executorsRemoved.foreach { removedExecutorId => -newExecutorTotal -= 1 -logInfo(s"Removing executor $removedExecutorId because it has been idle for " + - s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)") -executorsPendingToRemove.add(removedExecutorId) - } - executorsRemoved -} else { + } + + def killExecutors(executorIds: Seq[String], forceIfPending: Boolean = false): Seq[String] = { +logDebug(s"Starting kill process for $executorIds") +val result = client.killExecutors(executorIds, forceIfPending = forceIfPending) +if (result.isEmpty) { logWarning(s"Unable to reach the cluster manager to kill executor/s " + -s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!") - Seq.empty[String] +s"${executorIds.mkString(",")} or no executor eligible to kill!") } +result } - /** - * Request the cluster manager to remove the given executor. - * Return whether the request is acknowledged. - */ - private def removeExecutor(executorId: String): Boolean = synchronized { -val executorsRemoved = removeExecutors(Seq(executorId)) -executorsRemoved.nonEmpty && executorsRemoved(0) ==
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143810176 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143813871 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143814023 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143818025 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -698,6 +696,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp None } } + + override def markPendingToRemove(executorIds: Seq[String]): Unit = synchronized { +logDebug(s"marking $executorIds pending to remove") +executorIds.foreach(id => executorsPendingToRemove.put(id, true)) --- End diff -- `.foreach { id => ... }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143817365 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -646,18 +648,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Future.successful(true) } - val killExecutors: Boolean => Future[Boolean] = -if (!executorsToKill.isEmpty) { - _ => doKillExecutors(executorsToKill) -} else { - _ => Future.successful(false) -} - - val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + val killResponse = if (executorsToKill.nonEmpty) { +adjustTotalExecutors.flatMap(_ => doKillExecutors(executorsToKill))(ThreadUtils.sameThread) --- End diff -- `.flatMap { _ => ... }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143817162 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -601,40 +602,41 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * * @param executorIds identifiers of executors to kill * @param replace whether to replace the killed executors with new ones, default false - * @param force whether to force kill busy executors, default false + * @param forceIfBusy whether to force kill busy executors, default false + * @param forceIfPending whether to force kill executors that are pending to die, default false * @return the ids of the executors acknowledged by the cluster manager to be removed. */ final override def killExecutors( executorIds: Seq[String], replace: Boolean, - force: Boolean): Seq[String] = { + forceIfBusy: Boolean, + forceIfPending: Boolean): Seq[String] = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") -val response = synchronized { +val response: Future[Seq[String]] = synchronized { val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) - unknownExecutors.foreach { id => -logWarning(s"Executor to kill $id does not exist!") - } + unknownExecutors.foreach(id => logWarning(s"Executor to kill $id does not exist!")) // If an executor is already pending to be removed, do not kill it again (SPARK-9795) // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) val executorsToKill = knownExecutors -.filter { id => !executorsPendingToRemove.contains(id) } -.filter { id => force || !scheduler.isExecutorBusy(id) } - executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } +.filter { id => forceIfPending || !executorsPendingToRemove.contains(id) } +.filter { id => forceIfBusy || !scheduler.isExecutorBusy(id) } + executorsToKill.foreach(id => executorsPendingToRemove(id) = !replace) --- End diff -- Previous style was correct. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143816092 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { --- End diff -- Probably better called `startCacheRecovery`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143819863 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -237,6 +246,43 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def replicateOneBlock( + execId: String, + blockId: BlockId, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"replicating block $blockId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + replicaSet <- blockLocations.asScala.get(blockId) + replicas = replicaSet.toSeq + maxReps = replicaSet.size + 2 +} yield info.slaveEndpoint.ask[Boolean](ReplicateBlock(blockId, replicas, excluded, maxReps)) + +response.getOrElse(Future.successful(false)).foreach(context.reply) + } + + private def getCachedBlocks(executorId: String): collection.Set[BlockId] = { --- End diff -- just `Set[BlockId]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143802830 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143803799 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success +val (response, blockId) = state.replicateFirstBlock(execId) +response.onComplete { + case Success(true) => +logTrace(s"Finished replicating block ${blockId.getOrElse("unknown")} on exec $execId.") +replicateBlocks(execId) + case Success(false) => +checkForReplicableBlocks(Seq(execId)) + case Failure(f) => +logWarning(s"Error trying to replicate block ${blockId.getOrElse("unknown")}.", f) +state.killExecutor(execId) +} + } +} + +private object CacheRecoveryManager { + def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { +val bmme = SparkEnv.get.blockManager.master.driverEndpoint +val state = new CacheRecoveryManagerState(bmme, eam, conf) +new CacheRecoveryManager(state, conf) + } +} + +/** + * Private class that holds state for all the executors being shutdown. + * + * @param blockManagerMasterEndpoint blockManagerMasterEndpoint + * @param executorAllocationManager ExecutorAllocationManager + * @param conf spark conf + */ +final private class
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143801381 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startExecutorKill(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +checkForReplicableBlocks(execIds) + } + + /** + * Stops all thread pools + * + * @return + */ + def stop(): java.util.List[Runnable] = { +threadPool.shutdownNow() +state.stop() + } + + /** + * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, + * otherwise kill the executors + * + * @param execIds the executors to check + */ + private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { +case (executorId, HasCachedBlocks) => replicateBlocks(executorId) +case (executorId, NoMoreBlocks | NotEnoughMemory) => state.killExecutor(executorId) + } + + /** + * Replicate one cached block on an executor. If there are more, repeat. If there are none, check + * with the block manager master again. If there is an error, go ahead and kill executor. + * + * @param execId the executor to save a block one + */ + private def replicateBlocks(execId: String): Unit = { +import scala.util.Success --- End diff -- Why isn't this imported at the top? (And you know scala allows you to rename types on import, in case there's a conflict, right?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143799387 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_RECOVER_CACHE_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages.{GetCachedBlocks, GetMemoryStatus, GetSizeOfBlocks, ReplicateOneBlock} +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +state: CacheRecoveryManagerState, +conf: SparkConf) + extends Logging { + + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") --- End diff -- Same thing as before with the weird name. "cache-recovery-manager-pool" is a much better name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143816714 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -76,6 +76,14 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("3s") + private[spark] val DYN_ALLOCATION_RECOVER_CACHE = --- End diff -- as with other suggestion, `CACHE_RECOVERY` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143815711 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -416,63 +423,52 @@ private[spark] class ExecutorAllocationManager( * Request the cluster manager to remove the given executors. * Returns the list of executors which are removed. */ - private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized { -val executorIdsToBeRemoved = new ArrayBuffer[String] - + private def removeExecutors(executors: Seq[String]): Unit = synchronized { logInfo("Request to remove executorIds: " + executors.mkString(", ")) -val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size - -var newExecutorTotal = numExistingExecutors -executors.foreach { executorIdToBeRemoved => - if (newExecutorTotal - 1 < minNumExecutors) { -logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + - s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)") - } else if (newExecutorTotal - 1 < numExecutorsTarget) { -logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + - s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)") - } else if (canBeKilled(executorIdToBeRemoved)) { -executorIdsToBeRemoved += executorIdToBeRemoved -newExecutorTotal -= 1 - } -} +val numExistingExecs = allocationManager.executorIds.size - executorsPendingToRemove.size +val execCountFloor = math.max(minNumExecutors, numExecutorsTarget) +val (executorIdsToBeRemoved, dontRemove) = executors + .filter(canBeKilled) + .splitAt(numExistingExecs - execCountFloor) -if (executorIdsToBeRemoved.isEmpty) { - return Seq.empty[String] +dontRemove.foreach { execId => --- End diff -- minor: add `log.isDebugEnabled` check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org