spark git commit: [SPARK-22757][K8S] Enable spark.jars and spark.files in KUBERNETES mode
Repository: spark Updated Branches: refs/heads/branch-2.3 5b524cc0c -> f9dcdbcef [SPARK-22757][K8S] Enable spark.jars and spark.files in KUBERNETES mode ## What changes were proposed in this pull request? We missed enabling `spark.files` and `spark.jars` in https://github.com/apache/spark/pull/19954. The result is that remote dependencies specified through `spark.files` or `spark.jars` are not included in the list of remote dependencies to be downloaded by the init-container. This PR fixes it. ## How was this patch tested? Manual tests. vanzin This replaces https://github.com/apache/spark/pull/20157. foxish Author: Yinan LiCloses #20160 from liyinan926/SPARK-22757. (cherry picked from commit 6cff7d19f6a905fe425bd6892fe7ca014c0e696b) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9dcdbce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9dcdbce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9dcdbce Branch: refs/heads/branch-2.3 Commit: f9dcdbcefb545ced3f5b457e1e88c88a8e180f9f Parents: 5b524cc Author: Yinan Li Authored: Thu Jan 4 23:23:41 2018 -0800 Committer: Felix Cheung Committed: Thu Jan 4 23:23:59 2018 -0800 -- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f9dcdbce/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index cbe1f2c..1e38196 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -584,10 +584,11 @@ object SparkSubmit extends CommandLineUtils with Logging { confKey = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.cores.max"), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.files"), OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"), - OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"), + OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, +confKey = "spark.jars"), OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, confKey = "spark.driver.memory"), OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22757][K8S] Enable spark.jars and spark.files in KUBERNETES mode
Repository: spark Updated Branches: refs/heads/master cf0aa6557 -> 6cff7d19f [SPARK-22757][K8S] Enable spark.jars and spark.files in KUBERNETES mode ## What changes were proposed in this pull request? We missed enabling `spark.files` and `spark.jars` in https://github.com/apache/spark/pull/19954. The result is that remote dependencies specified through `spark.files` or `spark.jars` are not included in the list of remote dependencies to be downloaded by the init-container. This PR fixes it. ## How was this patch tested? Manual tests. vanzin This replaces https://github.com/apache/spark/pull/20157. foxish Author: Yinan LiCloses #20160 from liyinan926/SPARK-22757. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6cff7d19 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6cff7d19 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6cff7d19 Branch: refs/heads/master Commit: 6cff7d19f6a905fe425bd6892fe7ca014c0e696b Parents: cf0aa65 Author: Yinan Li Authored: Thu Jan 4 23:23:41 2018 -0800 Committer: Felix Cheung Committed: Thu Jan 4 23:23:41 2018 -0800 -- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6cff7d19/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index cbe1f2c..1e38196 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -584,10 +584,11 @@ object SparkSubmit extends CommandLineUtils with Logging { confKey = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.cores.max"), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.files"), OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"), - OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"), + OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, +confKey = "spark.jars"), OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, confKey = "spark.driver.memory"), OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22949][ML] Apply CrossValidator approach to Driver/Distributed memory tradeoff for TrainValidationSplit
Repository: spark Updated Branches: refs/heads/branch-2.3 145820bda -> 5b524cc0c [SPARK-22949][ML] Apply CrossValidator approach to Driver/Distributed memory tradeoff for TrainValidationSplit ## What changes were proposed in this pull request? Avoid holding all models in memory for `TrainValidationSplit`. ## How was this patch tested? Existing tests. Author: Bago AmirbekianCloses #20143 from MrBago/trainValidMemoryFix. (cherry picked from commit cf0aa65576acbe0209c67f04c029058fd73555c1) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b524cc0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b524cc0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b524cc0 Branch: refs/heads/branch-2.3 Commit: 5b524cc0cd5a82e4fb0681363b6641e40b37075d Parents: 145820b Author: Bago Amirbekian Authored: Thu Jan 4 22:45:15 2018 -0800 Committer: Joseph K. Bradley Committed: Thu Jan 4 22:45:24 2018 -0800 -- .../apache/spark/ml/tuning/CrossValidator.scala | 4 +++- .../spark/ml/tuning/TrainValidationSplit.scala| 18 -- 2 files changed, 7 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b524cc0/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 095b54c..a0b507d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -160,8 +160,10 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) } (executionContext) } - // Wait for metrics to be calculated before unpersisting validation dataset + // Wait for metrics to be calculated val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) + + // Unpersist training & validation set once all metrics have been produced trainingDataset.unpersist() validationDataset.unpersist() foldMetrics http://git-wip-us.apache.org/repos/asf/spark/blob/5b524cc0/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index c73bd18..8826ef3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -143,24 +143,13 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St // Fit models in a Future for training in parallel logDebug(s"Train split with multiple sets of parameters.") -val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) => - Future[Model[_]] { +val metricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) => + Future[Double] { val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]] if (collectSubModelsParam) { subModels.get(paramIndex) = model } -model - } (executionContext) -} - -// Unpersist training data only when all models have trained -Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext) - .onComplete { _ => trainingDataset.unpersist() } (executionContext) - -// Evaluate models in a Future that will calulate a metric and allow model to be cleaned up -val metricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) => - modelFuture.map { model => // TODO: duplicate evaluator to take extra params from input val metric = eval.evaluate(model.transform(validationDataset, paramMap)) logDebug(s"Got metric $metric for model trained with $paramMap.") @@ -171,7 +160,8 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St // Wait for all metrics to be calculated val metrics = metricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) -// Unpersist validation set once all metrics have been produced +// Unpersist training & validation set once all metrics have been produced +trainingDataset.unpersist() validationDataset.unpersist() logInfo(s"Train validation split metrics: ${metrics.toSeq}")
spark git commit: [SPARK-22949][ML] Apply CrossValidator approach to Driver/Distributed memory tradeoff for TrainValidationSplit
Repository: spark Updated Branches: refs/heads/master 52fc5c17d -> cf0aa6557 [SPARK-22949][ML] Apply CrossValidator approach to Driver/Distributed memory tradeoff for TrainValidationSplit ## What changes were proposed in this pull request? Avoid holding all models in memory for `TrainValidationSplit`. ## How was this patch tested? Existing tests. Author: Bago AmirbekianCloses #20143 from MrBago/trainValidMemoryFix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf0aa655 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf0aa655 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf0aa655 Branch: refs/heads/master Commit: cf0aa65576acbe0209c67f04c029058fd73555c1 Parents: 52fc5c1 Author: Bago Amirbekian Authored: Thu Jan 4 22:45:15 2018 -0800 Committer: Joseph K. Bradley Committed: Thu Jan 4 22:45:15 2018 -0800 -- .../apache/spark/ml/tuning/CrossValidator.scala | 4 +++- .../spark/ml/tuning/TrainValidationSplit.scala| 18 -- 2 files changed, 7 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cf0aa655/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 095b54c..a0b507d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -160,8 +160,10 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) } (executionContext) } - // Wait for metrics to be calculated before unpersisting validation dataset + // Wait for metrics to be calculated val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) + + // Unpersist training & validation set once all metrics have been produced trainingDataset.unpersist() validationDataset.unpersist() foldMetrics http://git-wip-us.apache.org/repos/asf/spark/blob/cf0aa655/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index c73bd18..8826ef3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -143,24 +143,13 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St // Fit models in a Future for training in parallel logDebug(s"Train split with multiple sets of parameters.") -val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) => - Future[Model[_]] { +val metricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) => + Future[Double] { val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]] if (collectSubModelsParam) { subModels.get(paramIndex) = model } -model - } (executionContext) -} - -// Unpersist training data only when all models have trained -Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext) - .onComplete { _ => trainingDataset.unpersist() } (executionContext) - -// Evaluate models in a Future that will calulate a metric and allow model to be cleaned up -val metricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) => - modelFuture.map { model => // TODO: duplicate evaluator to take extra params from input val metric = eval.evaluate(model.transform(validationDataset, paramMap)) logDebug(s"Got metric $metric for model trained with $paramMap.") @@ -171,7 +160,8 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St // Wait for all metrics to be calculated val metrics = metricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) -// Unpersist validation set once all metrics have been produced +// Unpersist training & validation set once all metrics have been produced +trainingDataset.unpersist() validationDataset.unpersist() logInfo(s"Train validation split metrics: ${metrics.toSeq}") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24033 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_04_22_01-158f7e6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Jan 5 06:14:49 2018 New Revision: 24033 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_04_22_01-158f7e6 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22825][SQL] Fix incorrect results of Casting Array to String
Repository: spark Updated Branches: refs/heads/branch-2.3 158f7e6a9 -> 145820bda [SPARK-22825][SQL] Fix incorrect results of Casting Array to String ## What changes were proposed in this pull request? This pr fixed the issue when casting arrays into strings; ``` scala> val df = spark.range(10).select('id.cast("integer")).agg(collect_list('id).as('ids)) scala> df.write.saveAsTable("t") scala> sql("SELECT cast(ids as String) FROM t").show(false) +--+ |ids | +--+ |org.apache.spark.sql.catalyst.expressions.UnsafeArrayData8bc285df| +--+ ``` This pr modified the result into; ``` +--+ |ids | +--+ |[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]| +--+ ``` ## How was this patch tested? Added tests in `CastSuite` and `SQLQuerySuite`. Author: Takeshi YamamuroCloses #20024 from maropu/SPARK-22825. (cherry picked from commit 52fc5c17d9d784b846149771b398e741621c0b5c) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/145820bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/145820bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/145820bd Branch: refs/heads/branch-2.3 Commit: 145820bda140d1385c4dd802fa79a871e6bf98be Parents: 158f7e6 Author: Takeshi Yamamuro Authored: Fri Jan 5 14:02:21 2018 +0800 Committer: Wenchen Fan Committed: Fri Jan 5 14:03:00 2018 +0800 -- .../expressions/codegen/UTF8StringBuilder.java | 78 .../spark/sql/catalyst/expressions/Cast.scala | 68 + .../sql/catalyst/expressions/CastSuite.scala| 25 +++ .../org/apache/spark/sql/SQLQuerySuite.scala| 2 - 4 files changed, 171 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/145820bd/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java new file mode 100644 index 000..f0f66ba --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen; + +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.ByteArrayMethods; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A helper class to write {@link UTF8String}s to an internal buffer and build the concatenated + * {@link UTF8String} at the end. + */ +public class UTF8StringBuilder { + + private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; + + private byte[] buffer; + private int cursor = Platform.BYTE_ARRAY_OFFSET; + + public UTF8StringBuilder() { +// Since initial buffer size is 16 in `StringBuilder`, we set the same size here +this.buffer = new byte[16]; + } + + // Grows the buffer by at least `neededSize` + private void grow(int neededSize) { +if (neededSize > ARRAY_MAX - totalSize()) { + throw new UnsupportedOperationException( +"Cannot grow internal buffer by size " + neededSize + " because the size after growing " + + "exceeds size limitation " + ARRAY_MAX); +} +final int length = totalSize() + neededSize; +if (buffer.length < length) { + int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX; + final byte[] tmp = new
spark git commit: [SPARK-22825][SQL] Fix incorrect results of Casting Array to String
Repository: spark Updated Branches: refs/heads/master df7fc3ef3 -> 52fc5c17d [SPARK-22825][SQL] Fix incorrect results of Casting Array to String ## What changes were proposed in this pull request? This pr fixed the issue when casting arrays into strings; ``` scala> val df = spark.range(10).select('id.cast("integer")).agg(collect_list('id).as('ids)) scala> df.write.saveAsTable("t") scala> sql("SELECT cast(ids as String) FROM t").show(false) +--+ |ids | +--+ |org.apache.spark.sql.catalyst.expressions.UnsafeArrayData8bc285df| +--+ ``` This pr modified the result into; ``` +--+ |ids | +--+ |[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]| +--+ ``` ## How was this patch tested? Added tests in `CastSuite` and `SQLQuerySuite`. Author: Takeshi YamamuroCloses #20024 from maropu/SPARK-22825. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52fc5c17 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52fc5c17 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52fc5c17 Branch: refs/heads/master Commit: 52fc5c17d9d784b846149771b398e741621c0b5c Parents: df7fc3e Author: Takeshi Yamamuro Authored: Fri Jan 5 14:02:21 2018 +0800 Committer: Wenchen Fan Committed: Fri Jan 5 14:02:21 2018 +0800 -- .../expressions/codegen/UTF8StringBuilder.java | 78 .../spark/sql/catalyst/expressions/Cast.scala | 68 + .../sql/catalyst/expressions/CastSuite.scala| 25 +++ .../org/apache/spark/sql/SQLQuerySuite.scala| 2 - 4 files changed, 171 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/52fc5c17/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java new file mode 100644 index 000..f0f66ba --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen; + +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.ByteArrayMethods; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A helper class to write {@link UTF8String}s to an internal buffer and build the concatenated + * {@link UTF8String} at the end. + */ +public class UTF8StringBuilder { + + private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; + + private byte[] buffer; + private int cursor = Platform.BYTE_ARRAY_OFFSET; + + public UTF8StringBuilder() { +// Since initial buffer size is 16 in `StringBuilder`, we set the same size here +this.buffer = new byte[16]; + } + + // Grows the buffer by at least `neededSize` + private void grow(int neededSize) { +if (neededSize > ARRAY_MAX - totalSize()) { + throw new UnsupportedOperationException( +"Cannot grow internal buffer by size " + neededSize + " because the size after growing " + + "exceeds size limitation " + ARRAY_MAX); +} +final int length = totalSize() + neededSize; +if (buffer.length < length) { + int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX; + final byte[] tmp = new byte[newLength]; + Platform.copyMemory( +buffer, +Platform.BYTE_ARRAY_OFFSET, +tmp, +
svn commit: r24031 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_04_20_01-df7fc3e-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Jan 5 04:14:56 2018 New Revision: 24031 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_04_20_01-df7fc3e docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt
Repository: spark Updated Branches: refs/heads/branch-2.3 ea9da6152 -> 158f7e6a9 [SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt ## What changes were proposed in this pull request? 32bit Int was used for row rank. That overflowed in a dataframe with more than 2B rows. ## How was this patch tested? Added test, but ignored, as it takes 4 minutes. Author: Juliusz SompolskiCloses #20152 from juliuszsompolski/SPARK-22957. (cherry picked from commit df7fc3ef3899cadd252d2837092bebe3442d6523) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/158f7e6a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/158f7e6a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/158f7e6a Branch: refs/heads/branch-2.3 Commit: 158f7e6a93b5acf4ce05c97b575124fd599cf927 Parents: ea9da61 Author: Juliusz Sompolski Authored: Fri Jan 5 10:16:34 2018 +0800 Committer: Wenchen Fan Committed: Fri Jan 5 10:16:53 2018 +0800 -- .../expressions/aggregate/ApproximatePercentile.scala | 12 ++-- .../spark/sql/catalyst/util/QuantileSummaries.scala | 8 .../scala/org/apache/spark/sql/DataFrameStatSuite.scala | 8 3 files changed, 18 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/158f7e6a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala index 149ac26..a45854a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala @@ -296,8 +296,8 @@ object ApproximatePercentile { Ints.BYTES + Doubles.BYTES + Longs.BYTES + // length of summary.sampled Ints.BYTES + - // summary.sampled, Array[Stat(value: Double, g: Int, delta: Int)] - summaries.sampled.length * (Doubles.BYTES + Ints.BYTES + Ints.BYTES) + // summary.sampled, Array[Stat(value: Double, g: Long, delta: Long)] + summaries.sampled.length * (Doubles.BYTES + Longs.BYTES + Longs.BYTES) } final def serialize(obj: PercentileDigest): Array[Byte] = { @@ -312,8 +312,8 @@ object ApproximatePercentile { while (i < summary.sampled.length) { val stat = summary.sampled(i) buffer.putDouble(stat.value) -buffer.putInt(stat.g) -buffer.putInt(stat.delta) +buffer.putLong(stat.g) +buffer.putLong(stat.delta) i += 1 } buffer.array() @@ -330,8 +330,8 @@ object ApproximatePercentile { var i = 0 while (i < sampledLength) { val value = buffer.getDouble() -val g = buffer.getInt() -val delta = buffer.getInt() +val g = buffer.getLong() +val delta = buffer.getLong() sampled(i) = Stats(value, g, delta) i += 1 } http://git-wip-us.apache.org/repos/asf/spark/blob/158f7e6a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala index eb7941c..b013add 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala @@ -105,7 +105,7 @@ class QuantileSummaries( if (newSamples.isEmpty || (sampleIdx == sampled.length && opsIdx == sorted.length - 1)) { 0 } else { - math.floor(2 * relativeError * currentCount).toInt + math.floor(2 * relativeError * currentCount).toLong } val tuple = Stats(currentSample, 1, delta) @@ -192,10 +192,10 @@ class QuantileSummaries( } // Target rank -val rank = math.ceil(quantile * count).toInt +val rank = math.ceil(quantile * count).toLong val targetError = relativeError * count // Minimum rank at current sample -var minRank = 0 +var minRank = 0L var i = 0 while (i < sampled.length - 1) { val curSample = sampled(i) @@ -235,7 +235,7 @@ object QuantileSummaries { * @param g the minimum rank jump from
spark git commit: [SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt
Repository: spark Updated Branches: refs/heads/master 0428368c2 -> df7fc3ef3 [SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt ## What changes were proposed in this pull request? 32bit Int was used for row rank. That overflowed in a dataframe with more than 2B rows. ## How was this patch tested? Added test, but ignored, as it takes 4 minutes. Author: Juliusz SompolskiCloses #20152 from juliuszsompolski/SPARK-22957. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df7fc3ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df7fc3ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df7fc3ef Branch: refs/heads/master Commit: df7fc3ef3899cadd252d2837092bebe3442d6523 Parents: 0428368 Author: Juliusz Sompolski Authored: Fri Jan 5 10:16:34 2018 +0800 Committer: Wenchen Fan Committed: Fri Jan 5 10:16:34 2018 +0800 -- .../expressions/aggregate/ApproximatePercentile.scala | 12 ++-- .../spark/sql/catalyst/util/QuantileSummaries.scala | 8 .../scala/org/apache/spark/sql/DataFrameStatSuite.scala | 8 3 files changed, 18 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/df7fc3ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala index 149ac26..a45854a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala @@ -296,8 +296,8 @@ object ApproximatePercentile { Ints.BYTES + Doubles.BYTES + Longs.BYTES + // length of summary.sampled Ints.BYTES + - // summary.sampled, Array[Stat(value: Double, g: Int, delta: Int)] - summaries.sampled.length * (Doubles.BYTES + Ints.BYTES + Ints.BYTES) + // summary.sampled, Array[Stat(value: Double, g: Long, delta: Long)] + summaries.sampled.length * (Doubles.BYTES + Longs.BYTES + Longs.BYTES) } final def serialize(obj: PercentileDigest): Array[Byte] = { @@ -312,8 +312,8 @@ object ApproximatePercentile { while (i < summary.sampled.length) { val stat = summary.sampled(i) buffer.putDouble(stat.value) -buffer.putInt(stat.g) -buffer.putInt(stat.delta) +buffer.putLong(stat.g) +buffer.putLong(stat.delta) i += 1 } buffer.array() @@ -330,8 +330,8 @@ object ApproximatePercentile { var i = 0 while (i < sampledLength) { val value = buffer.getDouble() -val g = buffer.getInt() -val delta = buffer.getInt() +val g = buffer.getLong() +val delta = buffer.getLong() sampled(i) = Stats(value, g, delta) i += 1 } http://git-wip-us.apache.org/repos/asf/spark/blob/df7fc3ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala index eb7941c..b013add 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala @@ -105,7 +105,7 @@ class QuantileSummaries( if (newSamples.isEmpty || (sampleIdx == sampled.length && opsIdx == sorted.length - 1)) { 0 } else { - math.floor(2 * relativeError * currentCount).toInt + math.floor(2 * relativeError * currentCount).toLong } val tuple = Stats(currentSample, 1, delta) @@ -192,10 +192,10 @@ class QuantileSummaries( } // Target rank -val rank = math.ceil(quantile * count).toInt +val rank = math.ceil(quantile * count).toLong val targetError = relativeError * count // Minimum rank at current sample -var minRank = 0 +var minRank = 0L var i = 0 while (i < sampled.length - 1) { val curSample = sampled(i) @@ -235,7 +235,7 @@ object QuantileSummaries { * @param g the minimum rank jump from the previous value's minimum rank * @param delta the maximum span of the rank. */ - case class Stats(value: Double, g:
svn commit: r24030 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_04_18_01-ea9da61-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Jan 5 02:14:49 2018 New Revision: 24030 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_04_18_01-ea9da61 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22960][K8S] Make build-push-docker-images.sh more dev-friendly.
Repository: spark Updated Branches: refs/heads/master e288fc87a -> 0428368c2 [SPARK-22960][K8S] Make build-push-docker-images.sh more dev-friendly. - Make it possible to build images from a git clone. - Make it easy to use minikube to test things. Also fixed what seemed like a bug: the base image wasn't getting the tag provided in the command line. Adding the tag allows users to use multiple Spark builds in the same kubernetes cluster. Tested by deploying images on minikube and running spark-submit from a dev environment; also by building the images with different tags and verifying "docker images" in minikube. Author: Marcelo VanzinCloses #20154 from vanzin/SPARK-22960. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0428368c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0428368c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0428368c Branch: refs/heads/master Commit: 0428368c2c5e135f99f62be20877bbbda43be310 Parents: e288fc8 Author: Marcelo Vanzin Authored: Thu Jan 4 16:34:56 2018 -0800 Committer: Marcelo Vanzin Committed: Thu Jan 4 16:34:56 2018 -0800 -- docs/running-on-kubernetes.md | 9 +- .../src/main/dockerfiles/driver/Dockerfile | 3 +- .../src/main/dockerfiles/executor/Dockerfile| 3 +- .../main/dockerfiles/init-container/Dockerfile | 3 +- .../src/main/dockerfiles/spark-base/Dockerfile | 7 +- sbin/build-push-docker-images.sh| 120 +++ 6 files changed, 117 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0428368c/docs/running-on-kubernetes.md -- diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index e491329..2d69f63 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -16,6 +16,9 @@ Kubernetes scheduler that has been added to Spark. you may setup a test cluster on your local machine using [minikube](https://kubernetes.io/docs/getting-started-guides/minikube/). * We recommend using the latest release of minikube with the DNS addon enabled. + * Be aware that the default minikube configuration is not enough for running Spark applications. + We recommend 3 CPUs and 4g of memory to be able to start a simple Spark application with a single + executor. * You must have appropriate permissions to list, create, edit and delete [pods](https://kubernetes.io/docs/user-guide/pods/) in your cluster. You can verify that you can list these resources by running `kubectl auth can-i pods`. @@ -197,7 +200,7 @@ kubectl port-forward 4040:4040 Then, the Spark driver UI can be accessed on `http://localhost:4040`. -### Debugging +### Debugging There may be several kinds of failures. If the Kubernetes API server rejects the request made from spark-submit, or the connection is refused for a different reason, the submission logic should indicate the error encountered. However, if there @@ -215,8 +218,8 @@ If the pod has encountered a runtime error, the status can be probed further usi kubectl logs ``` -Status and logs of failed executor pods can be checked in similar ways. Finally, deleting the driver pod will clean up the entire spark -application, includling all executors, associated service, etc. The driver pod can be thought of as the Kubernetes representation of +Status and logs of failed executor pods can be checked in similar ways. Finally, deleting the driver pod will clean up the entire spark +application, including all executors, associated service, etc. The driver pod can be thought of as the Kubernetes representation of the Spark application. ## Kubernetes Features http://git-wip-us.apache.org/repos/asf/spark/blob/0428368c/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile -- diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile index 45fbcd9..ff5289e 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile @@ -15,7 +15,8 @@ # limitations under the License. # -FROM spark-base +ARG base_image +FROM ${base_image} # Before building the docker image, first build and make a Spark distribution following # the instructions in http://spark.apache.org/docs/latest/building-spark.html.
spark git commit: [SPARK-22960][K8S] Make build-push-docker-images.sh more dev-friendly.
Repository: spark Updated Branches: refs/heads/branch-2.3 84707f0c6 -> ea9da6152 [SPARK-22960][K8S] Make build-push-docker-images.sh more dev-friendly. - Make it possible to build images from a git clone. - Make it easy to use minikube to test things. Also fixed what seemed like a bug: the base image wasn't getting the tag provided in the command line. Adding the tag allows users to use multiple Spark builds in the same kubernetes cluster. Tested by deploying images on minikube and running spark-submit from a dev environment; also by building the images with different tags and verifying "docker images" in minikube. Author: Marcelo VanzinCloses #20154 from vanzin/SPARK-22960. (cherry picked from commit 0428368c2c5e135f99f62be20877bbbda43be310) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea9da615 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea9da615 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea9da615 Branch: refs/heads/branch-2.3 Commit: ea9da6152af9223787cffd83d489741b4cc5aa34 Parents: 84707f0 Author: Marcelo Vanzin Authored: Thu Jan 4 16:34:56 2018 -0800 Committer: Marcelo Vanzin Committed: Thu Jan 4 16:35:07 2018 -0800 -- docs/running-on-kubernetes.md | 9 +- .../src/main/dockerfiles/driver/Dockerfile | 3 +- .../src/main/dockerfiles/executor/Dockerfile| 3 +- .../main/dockerfiles/init-container/Dockerfile | 3 +- .../src/main/dockerfiles/spark-base/Dockerfile | 7 +- sbin/build-push-docker-images.sh| 120 +++ 6 files changed, 117 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ea9da615/docs/running-on-kubernetes.md -- diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index e491329..2d69f63 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -16,6 +16,9 @@ Kubernetes scheduler that has been added to Spark. you may setup a test cluster on your local machine using [minikube](https://kubernetes.io/docs/getting-started-guides/minikube/). * We recommend using the latest release of minikube with the DNS addon enabled. + * Be aware that the default minikube configuration is not enough for running Spark applications. + We recommend 3 CPUs and 4g of memory to be able to start a simple Spark application with a single + executor. * You must have appropriate permissions to list, create, edit and delete [pods](https://kubernetes.io/docs/user-guide/pods/) in your cluster. You can verify that you can list these resources by running `kubectl auth can-i pods`. @@ -197,7 +200,7 @@ kubectl port-forward 4040:4040 Then, the Spark driver UI can be accessed on `http://localhost:4040`. -### Debugging +### Debugging There may be several kinds of failures. If the Kubernetes API server rejects the request made from spark-submit, or the connection is refused for a different reason, the submission logic should indicate the error encountered. However, if there @@ -215,8 +218,8 @@ If the pod has encountered a runtime error, the status can be probed further usi kubectl logs ``` -Status and logs of failed executor pods can be checked in similar ways. Finally, deleting the driver pod will clean up the entire spark -application, includling all executors, associated service, etc. The driver pod can be thought of as the Kubernetes representation of +Status and logs of failed executor pods can be checked in similar ways. Finally, deleting the driver pod will clean up the entire spark +application, including all executors, associated service, etc. The driver pod can be thought of as the Kubernetes representation of the Spark application. ## Kubernetes Features http://git-wip-us.apache.org/repos/asf/spark/blob/ea9da615/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile -- diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile index 45fbcd9..ff5289e 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile @@ -15,7 +15,8 @@ # limitations under the License. # -FROM spark-base +ARG base_image +FROM ${base_image} # Before building the docker image, first build and make a Spark distribution following # the instructions in
svn commit: r24029 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_04_16_01-e288fc8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Jan 5 00:14:37 2018 New Revision: 24029 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_04_16_01-e288fc8 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-container is used
Repository: spark Updated Branches: refs/heads/branch-2.3 2ab4012ad -> 84707f0c6 [SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-container is used ## What changes were proposed in this pull request? User-specified secrets are mounted into both the main container and init-container (when it is used) in a Spark driver/executor pod, using the `MountSecretsBootstrap`. Because `MountSecretsBootstrap` always adds new secret volumes for the secrets to the pod, the same secret volumes get added twice, one when mounting the secrets to the main container, and the other when mounting the secrets to the init-container. This PR fixes the issue by separating `MountSecretsBootstrap.mountSecrets` out into two methods: `addSecretVolumes` for adding secret volumes to a pod and `mountSecrets` for mounting secret volumes to a container, respectively. `addSecretVolumes` is only called once for each pod, whereas `mountSecrets` is called individually for the main container and the init-container (if it is used). Ref: https://github.com/apache-spark-on-k8s/spark/issues/594. ## How was this patch tested? Unit tested and manually tested. vanzin This replaces https://github.com/apache/spark/pull/20148. hex108 foxish kimoonkim Author: Yinan LiCloses #20159 from liyinan926/master. (cherry picked from commit e288fc87a027ec1e1a21401d1f151df20dbfecf3) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84707f0c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84707f0c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84707f0c Branch: refs/heads/branch-2.3 Commit: 84707f0c6afa9c5417e271657ff930930f82213c Parents: 2ab4012 Author: Yinan Li Authored: Thu Jan 4 15:35:20 2018 -0800 Committer: Marcelo Vanzin Committed: Thu Jan 4 15:35:34 2018 -0800 -- .../deploy/k8s/MountSecretsBootstrap.scala | 30 ++-- .../k8s/submit/DriverConfigOrchestrator.scala | 16 - .../steps/BasicDriverConfigurationStep.scala| 2 +- .../submit/steps/DriverMountSecretsStep.scala | 4 +-- .../InitContainerMountSecretsStep.scala | 11 +++--- .../cluster/k8s/ExecutorPodFactory.scala| 6 ++-- .../spark/deploy/k8s/SecretVolumeUtils.scala| 36 .../deploy/k8s/submit/SecretVolumeUtils.scala | 36 .../BasicDriverConfigurationStepSuite.scala | 4 +-- .../steps/DriverMountSecretsStepSuite.scala | 4 +-- .../InitContainerMountSecretsStepSuite.scala| 7 +--- .../cluster/k8s/ExecutorPodFactorySuite.scala | 14 12 files changed, 88 insertions(+), 82 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/84707f0c/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala index 8286546..c35e7db 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala @@ -24,26 +24,36 @@ import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBui private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) { /** - * Mounts Kubernetes secrets as secret volumes into the given container in the given pod. + * Add new secret volumes for the secrets specified in secretNamesToMountPaths into the given pod. * * @param pod the pod into which the secret volumes are being added. - * @param container the container into which the secret volumes are being mounted. - * @return the updated pod and container with the secrets mounted. + * @return the updated pod with the secret volumes added. */ - def mountSecrets(pod: Pod, container: Container): (Pod, Container) = { + def addSecretVolumes(pod: Pod): Pod = { var podBuilder = new PodBuilder(pod) secretNamesToMountPaths.keys.foreach { name => podBuilder = podBuilder .editOrNewSpec() .addNewVolume() - .withName(secretVolumeName(name)) - .withNewSecret() -.withSecretName(name) -.endSecret() - .endVolume() +.withName(secretVolumeName(name)) +.withNewSecret() + .withSecretName(name) + .endSecret() +.endVolume()
spark git commit: [SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-container is used
Repository: spark Updated Branches: refs/heads/master 95f9659ab -> e288fc87a [SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-container is used ## What changes were proposed in this pull request? User-specified secrets are mounted into both the main container and init-container (when it is used) in a Spark driver/executor pod, using the `MountSecretsBootstrap`. Because `MountSecretsBootstrap` always adds new secret volumes for the secrets to the pod, the same secret volumes get added twice, one when mounting the secrets to the main container, and the other when mounting the secrets to the init-container. This PR fixes the issue by separating `MountSecretsBootstrap.mountSecrets` out into two methods: `addSecretVolumes` for adding secret volumes to a pod and `mountSecrets` for mounting secret volumes to a container, respectively. `addSecretVolumes` is only called once for each pod, whereas `mountSecrets` is called individually for the main container and the init-container (if it is used). Ref: https://github.com/apache-spark-on-k8s/spark/issues/594. ## How was this patch tested? Unit tested and manually tested. vanzin This replaces https://github.com/apache/spark/pull/20148. hex108 foxish kimoonkim Author: Yinan LiCloses #20159 from liyinan926/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e288fc87 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e288fc87 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e288fc87 Branch: refs/heads/master Commit: e288fc87a027ec1e1a21401d1f151df20dbfecf3 Parents: 95f9659 Author: Yinan Li Authored: Thu Jan 4 15:35:20 2018 -0800 Committer: Marcelo Vanzin Committed: Thu Jan 4 15:35:20 2018 -0800 -- .../deploy/k8s/MountSecretsBootstrap.scala | 30 ++-- .../k8s/submit/DriverConfigOrchestrator.scala | 16 - .../steps/BasicDriverConfigurationStep.scala| 2 +- .../submit/steps/DriverMountSecretsStep.scala | 4 +-- .../InitContainerMountSecretsStep.scala | 11 +++--- .../cluster/k8s/ExecutorPodFactory.scala| 6 ++-- .../spark/deploy/k8s/SecretVolumeUtils.scala| 36 .../deploy/k8s/submit/SecretVolumeUtils.scala | 36 .../BasicDriverConfigurationStepSuite.scala | 4 +-- .../steps/DriverMountSecretsStepSuite.scala | 4 +-- .../InitContainerMountSecretsStepSuite.scala| 7 +--- .../cluster/k8s/ExecutorPodFactorySuite.scala | 14 12 files changed, 88 insertions(+), 82 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e288fc87/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala index 8286546..c35e7db 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala @@ -24,26 +24,36 @@ import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBui private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) { /** - * Mounts Kubernetes secrets as secret volumes into the given container in the given pod. + * Add new secret volumes for the secrets specified in secretNamesToMountPaths into the given pod. * * @param pod the pod into which the secret volumes are being added. - * @param container the container into which the secret volumes are being mounted. - * @return the updated pod and container with the secrets mounted. + * @return the updated pod with the secret volumes added. */ - def mountSecrets(pod: Pod, container: Container): (Pod, Container) = { + def addSecretVolumes(pod: Pod): Pod = { var podBuilder = new PodBuilder(pod) secretNamesToMountPaths.keys.foreach { name => podBuilder = podBuilder .editOrNewSpec() .addNewVolume() - .withName(secretVolumeName(name)) - .withNewSecret() -.withSecretName(name) -.endSecret() - .endVolume() +.withName(secretVolumeName(name)) +.withNewSecret() + .withSecretName(name) + .endSecret() +.endVolume() .endSpec() } +podBuilder.build() + } + + /** + * Mounts Kubernetes secret volumes of the secrets specified in
spark git commit: [SPARK-22948][K8S] Move SparkPodInitContainer to correct package.
Repository: spark Updated Branches: refs/heads/master d2cddc88e -> 95f9659ab [SPARK-22948][K8S] Move SparkPodInitContainer to correct package. Author: Marcelo VanzinCloses #20156 from vanzin/SPARK-22948. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95f9659a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95f9659a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95f9659a Branch: refs/heads/master Commit: 95f9659abe8845f9f3f42fd7ababd79e55c52489 Parents: d2cddc8 Author: Marcelo Vanzin Authored: Thu Jan 4 15:00:09 2018 -0800 Committer: Marcelo Vanzin Committed: Thu Jan 4 15:00:09 2018 -0800 -- dev/sparktestsupport/modules.py | 2 +- .../deploy/k8s/SparkPodInitContainer.scala | 116 +++ .../deploy/rest/k8s/SparkPodInitContainer.scala | 116 --- .../deploy/k8s/SparkPodInitContainerSuite.scala | 86 ++ .../rest/k8s/SparkPodInitContainerSuite.scala | 86 -- .../main/dockerfiles/init-container/Dockerfile | 2 +- 6 files changed, 204 insertions(+), 204 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/dev/sparktestsupport/modules.py -- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index f834563..7164180 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -539,7 +539,7 @@ mesos = Module( kubernetes = Module( name="kubernetes", dependencies=[], -source_file_regexes=["resource-managers/kubernetes/core"], +source_file_regexes=["resource-managers/kubernetes"], build_profile_flags=["-Pkubernetes"], sbt_test_goals=["kubernetes/test"] ) http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala new file mode 100644 index 000..c0f0878 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.io.File +import java.util.concurrent.TimeUnit + +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * Process that fetches files from a resource staging server and/or arbitrary remote locations. + * + * The init-container can handle fetching files from any of those sources, but not all of the + * sources need to be specified. This allows for composing multiple instances of this container + * with different configurations for different download sources, or using the same container to + * download everything at once. + */ +private[spark] class SparkPodInitContainer( +sparkConf: SparkConf, +fileFetcher: FileFetcher) extends Logging { + + private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE) + private implicit val downloadExecutor = ExecutionContext.fromExecutorService( +ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize)) + + private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION)) + private val filesDownloadDir = new File(sparkConf.get(FILES_DOWNLOAD_LOCATION)) + + private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS) + private val remoteFiles =
spark git commit: [SPARK-22948][K8S] Move SparkPodInitContainer to correct package.
Repository: spark Updated Branches: refs/heads/branch-2.3 bc4bef472 -> 2ab4012ad [SPARK-22948][K8S] Move SparkPodInitContainer to correct package. Author: Marcelo VanzinCloses #20156 from vanzin/SPARK-22948. (cherry picked from commit 95f9659abe8845f9f3f42fd7ababd79e55c52489) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ab4012a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ab4012a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ab4012a Branch: refs/heads/branch-2.3 Commit: 2ab4012adda941ebd637bd248f65cefdf4aaf110 Parents: bc4bef4 Author: Marcelo Vanzin Authored: Thu Jan 4 15:00:09 2018 -0800 Committer: Marcelo Vanzin Committed: Thu Jan 4 15:00:21 2018 -0800 -- dev/sparktestsupport/modules.py | 2 +- .../deploy/k8s/SparkPodInitContainer.scala | 116 +++ .../deploy/rest/k8s/SparkPodInitContainer.scala | 116 --- .../deploy/k8s/SparkPodInitContainerSuite.scala | 86 ++ .../rest/k8s/SparkPodInitContainerSuite.scala | 86 -- .../main/dockerfiles/init-container/Dockerfile | 2 +- 6 files changed, 204 insertions(+), 204 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ab4012a/dev/sparktestsupport/modules.py -- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index f834563..7164180 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -539,7 +539,7 @@ mesos = Module( kubernetes = Module( name="kubernetes", dependencies=[], -source_file_regexes=["resource-managers/kubernetes/core"], +source_file_regexes=["resource-managers/kubernetes"], build_profile_flags=["-Pkubernetes"], sbt_test_goals=["kubernetes/test"] ) http://git-wip-us.apache.org/repos/asf/spark/blob/2ab4012a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala new file mode 100644 index 000..c0f0878 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.io.File +import java.util.concurrent.TimeUnit + +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * Process that fetches files from a resource staging server and/or arbitrary remote locations. + * + * The init-container can handle fetching files from any of those sources, but not all of the + * sources need to be specified. This allows for composing multiple instances of this container + * with different configurations for different download sources, or using the same container to + * download everything at once. + */ +private[spark] class SparkPodInitContainer( +sparkConf: SparkConf, +fileFetcher: FileFetcher) extends Logging { + + private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE) + private implicit val downloadExecutor = ExecutionContext.fromExecutorService( +ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize)) + + private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION)) + private val filesDownloadDir = new
spark git commit: [SPARK-22850][CORE] Ensure queued events are delivered to all event queues.
Repository: spark Updated Branches: refs/heads/branch-2.3 cd92913f3 -> bc4bef472 [SPARK-22850][CORE] Ensure queued events are delivered to all event queues. The code in LiveListenerBus was queueing events before start in the queues themselves; so in situations like the following: bus.post(someEvent) bus.addToEventLogQueue(listener) bus.start() "someEvent" would not be delivered to "listener" if that was the first listener in the queue, because the queue wouldn't exist when the event was posted. This change buffers the events before starting the bus in the bus itself, so that they can be delivered to all registered queues when the bus is started. Also tweaked the unit tests to cover the behavior above. Author: Marcelo VanzinCloses #20039 from vanzin/SPARK-22850. (cherry picked from commit d2cddc88eac32f26b18ec26bb59e85c6f09a8c88) Signed-off-by: Imran Rashid Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc4bef47 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc4bef47 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc4bef47 Branch: refs/heads/branch-2.3 Commit: bc4bef472de0e99f74a80954d694c3d1744afe3a Parents: cd92913 Author: Marcelo Vanzin Authored: Thu Jan 4 16:19:00 2018 -0600 Committer: Imran Rashid Committed: Thu Jan 4 16:19:22 2018 -0600 -- .../spark/scheduler/LiveListenerBus.scala | 45 +--- .../spark/scheduler/SparkListenerSuite.scala| 21 + 2 files changed, 52 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc4bef47/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 2312140..ba6387a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -62,6 +62,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) { private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() + // Visible for testing. + @volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]() + /** Add a listener to queue shared by all non-internal listeners. */ def addToSharedQueue(listener: SparkListenerInterface): Unit = { addToQueue(listener, SHARED_QUEUE) @@ -125,13 +128,39 @@ private[spark] class LiveListenerBus(conf: SparkConf) { /** Post an event to all queues. */ def post(event: SparkListenerEvent): Unit = { -if (!stopped.get()) { - metrics.numEventsPosted.inc() - val it = queues.iterator() - while (it.hasNext()) { -it.next().post(event) +if (stopped.get()) { + return +} + +metrics.numEventsPosted.inc() + +// If the event buffer is null, it means the bus has been started and we can avoid +// synchronization and post events directly to the queues. This should be the most +// common case during the life of the bus. +if (queuedEvents == null) { + postToQueues(event) + return +} + +// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread +// calling start() picks up the new event. +synchronized { + if (!started.get()) { +queuedEvents += event +return } } + +// If the bus was already started when the check above was made, just post directly to the +// queues. +postToQueues(event) + } + + private def postToQueues(event: SparkListenerEvent): Unit = { +val it = queues.iterator() +while (it.hasNext()) { + it.next().post(event) +} } /** @@ -149,7 +178,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) { } this.sparkContext = sc -queues.asScala.foreach(_.start(sc)) +queues.asScala.foreach { q => + q.start(sc) + queuedEvents.foreach(q.post) +} +queuedEvents = null metricsSystem.registerSource(metrics) } http://git-wip-us.apache.org/repos/asf/spark/blob/bc4bef47/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 1beb36a..da6ecb8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -48,7 +48,7 @@ class
spark git commit: [SPARK-22850][CORE] Ensure queued events are delivered to all event queues.
Repository: spark Updated Branches: refs/heads/master 93f92c0ed -> d2cddc88e [SPARK-22850][CORE] Ensure queued events are delivered to all event queues. The code in LiveListenerBus was queueing events before start in the queues themselves; so in situations like the following: bus.post(someEvent) bus.addToEventLogQueue(listener) bus.start() "someEvent" would not be delivered to "listener" if that was the first listener in the queue, because the queue wouldn't exist when the event was posted. This change buffers the events before starting the bus in the bus itself, so that they can be delivered to all registered queues when the bus is started. Also tweaked the unit tests to cover the behavior above. Author: Marcelo VanzinCloses #20039 from vanzin/SPARK-22850. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2cddc88 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2cddc88 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2cddc88 Branch: refs/heads/master Commit: d2cddc88eac32f26b18ec26bb59e85c6f09a8c88 Parents: 93f92c0 Author: Marcelo Vanzin Authored: Thu Jan 4 16:19:00 2018 -0600 Committer: Imran Rashid Committed: Thu Jan 4 16:19:00 2018 -0600 -- .../spark/scheduler/LiveListenerBus.scala | 45 +--- .../spark/scheduler/SparkListenerSuite.scala| 21 + 2 files changed, 52 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d2cddc88/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 2312140..ba6387a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -62,6 +62,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) { private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() + // Visible for testing. + @volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]() + /** Add a listener to queue shared by all non-internal listeners. */ def addToSharedQueue(listener: SparkListenerInterface): Unit = { addToQueue(listener, SHARED_QUEUE) @@ -125,13 +128,39 @@ private[spark] class LiveListenerBus(conf: SparkConf) { /** Post an event to all queues. */ def post(event: SparkListenerEvent): Unit = { -if (!stopped.get()) { - metrics.numEventsPosted.inc() - val it = queues.iterator() - while (it.hasNext()) { -it.next().post(event) +if (stopped.get()) { + return +} + +metrics.numEventsPosted.inc() + +// If the event buffer is null, it means the bus has been started and we can avoid +// synchronization and post events directly to the queues. This should be the most +// common case during the life of the bus. +if (queuedEvents == null) { + postToQueues(event) + return +} + +// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread +// calling start() picks up the new event. +synchronized { + if (!started.get()) { +queuedEvents += event +return } } + +// If the bus was already started when the check above was made, just post directly to the +// queues. +postToQueues(event) + } + + private def postToQueues(event: SparkListenerEvent): Unit = { +val it = queues.iterator() +while (it.hasNext()) { + it.next().post(event) +} } /** @@ -149,7 +178,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) { } this.sparkContext = sc -queues.asScala.foreach(_.start(sc)) +queues.asScala.foreach { q => + q.start(sc) + queuedEvents.foreach(q.post) +} +queuedEvents = null metricsSystem.registerSource(metrics) } http://git-wip-us.apache.org/repos/asf/spark/blob/d2cddc88/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 1beb36a..da6ecb8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -48,7 +48,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
svn commit: r24025 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_04_14_01-cd92913-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 4 22:16:06 2018 New Revision: 24025 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_04_14_01-cd92913 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24018 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_04_12_01-93f92c0-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 4 20:17:26 2018 New Revision: 24018 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_04_12_01-93f92c0 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21475][CORE][2ND ATTEMPT] Change to use NIO's Files API for external shuffle service
Repository: spark Updated Branches: refs/heads/master 6f68316e9 -> 93f92c0ed [SPARK-21475][CORE][2ND ATTEMPT] Change to use NIO's Files API for external shuffle service ## What changes were proposed in this pull request? This PR is the second attempt of #18684 , NIO's Files API doesn't override `skip` method for `InputStream`, so it will bring in performance issue (mentioned in #20119). But using `FileInputStream`/`FileOutputStream` will also bring in memory issue (https://dzone.com/articles/fileinputstream-fileoutputstream-considered-harmful), which is severe for long running external shuffle service. So here in this proposal, only fixing the external shuffle service related code. ## How was this patch tested? Existing tests. Author: jerryshaoCloses #20144 from jerryshao/SPARK-21475-v2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93f92c0e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93f92c0e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93f92c0e Branch: refs/heads/master Commit: 93f92c0ed7442a4382e97254307309977ff676f8 Parents: 6f68316 Author: jerryshao Authored: Thu Jan 4 11:39:42 2018 -0800 Committer: Shixiong Zhu Committed: Thu Jan 4 11:39:42 2018 -0800 -- .../apache/spark/network/buffer/FileSegmentManagedBuffer.java| 3 ++- .../apache/spark/network/shuffle/ShuffleIndexInformation.java| 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/93f92c0e/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index c20fab8..8b8f989 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; import com.google.common.base.Objects; import com.google.common.io.ByteStreams; @@ -132,7 +133,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { - FileChannel fileChannel = new FileInputStream(file).getChannel(); + FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); return new DefaultFileRegion(fileChannel, offset, length); } } http://git-wip-us.apache.org/repos/asf/spark/blob/93f92c0e/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java -- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java index eacf485..386738e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java @@ -19,10 +19,10 @@ package org.apache.spark.network.shuffle; import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.LongBuffer; +import java.nio.file.Files; /** * Keeps the index information for a particular map output @@ -39,7 +39,7 @@ public class ShuffleIndexInformation { offsets = buffer.asLongBuffer(); DataInputStream dis = null; try { - dis = new DataInputStream(new FileInputStream(indexFile)); + dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); dis.readFully(buffer.array()); } finally { if (dis != null) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21475][CORE][2ND ATTEMPT] Change to use NIO's Files API for external shuffle service
Repository: spark Updated Branches: refs/heads/branch-2.3 bcfeef5a9 -> cd92913f3 [SPARK-21475][CORE][2ND ATTEMPT] Change to use NIO's Files API for external shuffle service ## What changes were proposed in this pull request? This PR is the second attempt of #18684 , NIO's Files API doesn't override `skip` method for `InputStream`, so it will bring in performance issue (mentioned in #20119). But using `FileInputStream`/`FileOutputStream` will also bring in memory issue (https://dzone.com/articles/fileinputstream-fileoutputstream-considered-harmful), which is severe for long running external shuffle service. So here in this proposal, only fixing the external shuffle service related code. ## How was this patch tested? Existing tests. Author: jerryshaoCloses #20144 from jerryshao/SPARK-21475-v2. (cherry picked from commit 93f92c0ed7442a4382e97254307309977ff676f8) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd92913f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd92913f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd92913f Branch: refs/heads/branch-2.3 Commit: cd92913f345c8d932d3c651626c7f803e6abdcdb Parents: bcfeef5 Author: jerryshao Authored: Thu Jan 4 11:39:42 2018 -0800 Committer: Shixiong Zhu Committed: Thu Jan 4 11:39:54 2018 -0800 -- .../apache/spark/network/buffer/FileSegmentManagedBuffer.java| 3 ++- .../apache/spark/network/shuffle/ShuffleIndexInformation.java| 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cd92913f/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index c20fab8..8b8f989 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; import com.google.common.base.Objects; import com.google.common.io.ByteStreams; @@ -132,7 +133,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { - FileChannel fileChannel = new FileInputStream(file).getChannel(); + FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); return new DefaultFileRegion(fileChannel, offset, length); } } http://git-wip-us.apache.org/repos/asf/spark/blob/cd92913f/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java -- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java index eacf485..386738e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java @@ -19,10 +19,10 @@ package org.apache.spark.network.shuffle; import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.LongBuffer; +import java.nio.file.Files; /** * Keeps the index information for a particular map output @@ -39,7 +39,7 @@ public class ShuffleIndexInformation { offsets = buffer.asLongBuffer(); DataInputStream dis = null; try { - dis = new DataInputStream(new FileInputStream(indexFile)); + dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); dis.readFully(buffer.array()); } finally { if (dis != null) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24013 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_04_08_01-6f68316-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 4 16:19:45 2018 New Revision: 24013 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_04_08_01-6f68316 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark-website git commit: Change company to Oath from Yahoo
Repository: spark-website Updated Branches: refs/heads/asf-site 13d8bd58a -> 8c5354f29 Change company to Oath from Yahoo Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/8c5354f2 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/8c5354f2 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/8c5354f2 Branch: refs/heads/asf-site Commit: 8c5354f29badb9803a6903bf3aa74855eeaa69df Parents: 13d8bd5 Author: Thomas GravesAuthored: Wed Jan 3 15:37:22 2018 -0600 Committer: Sean Owen Committed: Thu Jan 4 09:50:38 2018 -0600 -- committers.md| 4 ++-- site/committers.html | 4 ++-- site/sitemap.xml | 14 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/8c5354f2/committers.md -- diff --git a/committers.md b/committers.md index 90d09ac..6c6b8ab 100644 --- a/committers.md +++ b/committers.md @@ -20,10 +20,10 @@ navigation: |Ankur Dave|UC Berkeley| |Aaron Davidson|Databricks| |Thomas Dudziak|Facebook| -|Robert Evans|Yahoo!| +|Robert Evans|Oath| |Wenchen Fan|Databricks| |Joseph Gonzalez|UC Berkeley| -|Thomas Graves|Yahoo!| +|Thomas Graves|Oath| |Stephen Haberman|Bizo| |Mark Hamstra|ClearStory Data| |Herman van Hovell|QuestTec B.V.| http://git-wip-us.apache.org/repos/asf/spark-website/blob/8c5354f2/site/committers.html -- diff --git a/site/committers.html b/site/committers.html index 84bb1fd..1a00435 100644 --- a/site/committers.html +++ b/site/committers.html @@ -246,7 +246,7 @@ Robert Evans - Yahoo! + Oath Wenchen Fan @@ -258,7 +258,7 @@ Thomas Graves - Yahoo! + Oath Stephen Haberman http://git-wip-us.apache.org/repos/asf/spark-website/blob/8c5354f2/site/sitemap.xml -- diff --git a/site/sitemap.xml b/site/sitemap.xml index a5e16e9..00ce7ef 100644 --- a/site/sitemap.xml +++ b/site/sitemap.xml @@ -672,31 +672,31 @@ weekly - https://spark.apache.org/mllib/ + https://spark.apache.org/graphx/ weekly - https://spark.apache.org/ + https://spark.apache.org/mllib/ weekly - https://spark.apache.org/sql/ + https://spark.apache.org/streaming/ weekly - https://spark.apache.org/streaming/ + https://spark.apache.org/news/ weekly - https://spark.apache.org/news/ + https://spark.apache.org/screencasts/ weekly - https://spark.apache.org/graphx/ + https://spark.apache.org/sql/ weekly - https://spark.apache.org/screencasts/ + https://spark.apache.org/ weekly - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24012 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_04_06_01-bcfeef5-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 4 14:14:49 2018 New Revision: 24012 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_04_06_01-bcfeef5 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22771][SQL] Add a missing return statement in Concat.checkInputDataTypes
Repository: spark Updated Branches: refs/heads/branch-2.3 1f5e3540c -> bcfeef5a9 [SPARK-22771][SQL] Add a missing return statement in Concat.checkInputDataTypes ## What changes were proposed in this pull request? This pr is a follow-up to fix a bug left in #19977. ## How was this patch tested? Added tests in `StringExpressionsSuite`. Author: Takeshi YamamuroCloses #20149 from maropu/SPARK-22771-FOLLOWUP. (cherry picked from commit 6f68316e98fad72b171df422566e1fc9a7bbfcde) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcfeef5a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcfeef5a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcfeef5a Branch: refs/heads/branch-2.3 Commit: bcfeef5a944d56af1a5106f5c07296ea2c262991 Parents: 1f5e354 Author: Takeshi Yamamuro Authored: Thu Jan 4 21:15:10 2018 +0800 Committer: gatorsmile Committed: Thu Jan 4 21:15:38 2018 +0800 -- .../sql/catalyst/expressions/stringExpressions.scala| 2 +- .../catalyst/expressions/StringExpressionsSuite.scala | 12 2 files changed, 13 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bcfeef5a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index b0da55a..41dc762 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -58,7 +58,7 @@ case class Concat(children: Seq[Expression]) extends Expression { } else { val childTypes = children.map(_.dataType) if (childTypes.exists(tpe => !Seq(StringType, BinaryType).contains(tpe))) { -TypeCheckResult.TypeCheckFailure( +return TypeCheckResult.TypeCheckFailure( s"input to function $prettyName should have StringType or BinaryType, but it's " + childTypes.map(_.simpleString).mkString("[", ", ", "]")) } http://git-wip-us.apache.org/repos/asf/spark/blob/bcfeef5a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala index 54cde77..97ddbeb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala @@ -51,6 +51,18 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Concat(strs.map(Literal.create(_, StringType))), strs.mkString, EmptyRow) } + test("SPARK-22771 Check Concat.checkInputDataTypes results") { +assert(Concat(Seq.empty[Expression]).checkInputDataTypes().isSuccess) +assert(Concat(Literal.create("a") :: Literal.create("b") :: Nil) + .checkInputDataTypes().isSuccess) +assert(Concat(Literal.create("a".getBytes) :: Literal.create("b".getBytes) :: Nil) + .checkInputDataTypes().isSuccess) +assert(Concat(Literal.create(1) :: Literal.create(2) :: Nil) + .checkInputDataTypes().isFailure) +assert(Concat(Literal.create("a") :: Literal.create("b".getBytes) :: Nil) + .checkInputDataTypes().isFailure) + } + test("concat_ws") { def testConcatWs(expected: String, sep: String, inputs: Any*): Unit = { val inputExprs = inputs.map { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22771][SQL] Add a missing return statement in Concat.checkInputDataTypes
Repository: spark Updated Branches: refs/heads/master 5aadbc929 -> 6f68316e9 [SPARK-22771][SQL] Add a missing return statement in Concat.checkInputDataTypes ## What changes were proposed in this pull request? This pr is a follow-up to fix a bug left in #19977. ## How was this patch tested? Added tests in `StringExpressionsSuite`. Author: Takeshi YamamuroCloses #20149 from maropu/SPARK-22771-FOLLOWUP. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f68316e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f68316e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f68316e Branch: refs/heads/master Commit: 6f68316e98fad72b171df422566e1fc9a7bbfcde Parents: 5aadbc9 Author: Takeshi Yamamuro Authored: Thu Jan 4 21:15:10 2018 +0800 Committer: gatorsmile Committed: Thu Jan 4 21:15:10 2018 +0800 -- .../sql/catalyst/expressions/stringExpressions.scala| 2 +- .../catalyst/expressions/StringExpressionsSuite.scala | 12 2 files changed, 13 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6f68316e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index b0da55a..41dc762 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -58,7 +58,7 @@ case class Concat(children: Seq[Expression]) extends Expression { } else { val childTypes = children.map(_.dataType) if (childTypes.exists(tpe => !Seq(StringType, BinaryType).contains(tpe))) { -TypeCheckResult.TypeCheckFailure( +return TypeCheckResult.TypeCheckFailure( s"input to function $prettyName should have StringType or BinaryType, but it's " + childTypes.map(_.simpleString).mkString("[", ", ", "]")) } http://git-wip-us.apache.org/repos/asf/spark/blob/6f68316e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala index 54cde77..97ddbeb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala @@ -51,6 +51,18 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Concat(strs.map(Literal.create(_, StringType))), strs.mkString, EmptyRow) } + test("SPARK-22771 Check Concat.checkInputDataTypes results") { +assert(Concat(Seq.empty[Expression]).checkInputDataTypes().isSuccess) +assert(Concat(Literal.create("a") :: Literal.create("b") :: Nil) + .checkInputDataTypes().isSuccess) +assert(Concat(Literal.create("a".getBytes) :: Literal.create("b".getBytes) :: Nil) + .checkInputDataTypes().isSuccess) +assert(Concat(Literal.create(1) :: Literal.create(2) :: Nil) + .checkInputDataTypes().isFailure) +assert(Concat(Literal.create("a") :: Literal.create("b".getBytes) :: Nil) + .checkInputDataTypes().isFailure) + } + test("concat_ws") { def testConcatWs(expected: String, sep: String, inputs: Any*): Unit = { val inputExprs = inputs.map { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22939][PYSPARK] Support Spark UDF in registerFunction
Repository: spark Updated Branches: refs/heads/branch-2.3 eb99b8ade -> 1f5e3540c [SPARK-22939][PYSPARK] Support Spark UDF in registerFunction ## What changes were proposed in this pull request? ```Python import random from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType, StringType random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() spark.catalog.registerFunction("random_udf", random_udf, StringType()) spark.sql("SELECT random_udf()").collect() ``` We will get the following error. ``` Py4JError: An error occurred while calling o29.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) ``` This PR is to support it. ## How was this patch tested? WIP Author: gatorsmileCloses #20137 from gatorsmile/registerFunction. (cherry picked from commit 5aadbc929cb194e06dbd3bab054a161569289af5) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f5e3540 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f5e3540 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f5e3540 Branch: refs/heads/branch-2.3 Commit: 1f5e3540c7535ceaea66ebd5ee2f598e8b3ba1a5 Parents: eb99b8a Author: gatorsmile Authored: Thu Jan 4 21:07:31 2018 +0800 Committer: gatorsmile Committed: Thu Jan 4 21:07:56 2018 +0800 -- python/pyspark/sql/catalog.py | 27 + python/pyspark/sql/context.py | 16 ++--- python/pyspark/sql/tests.py | 49 +++--- python/pyspark/sql/udf.py | 21 ++-- 4 files changed, 84 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f5e3540/python/pyspark/sql/catalog.py -- diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 659bc65..1566031 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -227,15 +227,15 @@ class Catalog(object): @ignore_unicode_prefix @since(2.0) def registerFunction(self, name, f, returnType=StringType()): -"""Registers a python function (including lambda function) as a UDF -so it can be used in SQL statements. +"""Registers a Python function (including lambda function) or a :class:`UserDefinedFunction` +as a UDF. The registered UDF can be used in SQL statement. In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically be done. For any other return type, the produced object must match the specified type. :param name: name of the UDF -:param f: python function +:param f: a Python function, or a wrapped/native UserDefinedFunction :param returnType: a :class:`pyspark.sql.types.DataType` object :return: a wrapped :class:`UserDefinedFunction` @@ -255,9 +255,26 @@ class Catalog(object): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] + +>>> import random +>>> from pyspark.sql.functions import udf +>>> from pyspark.sql.types import IntegerType, StringType +>>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() +>>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) +>>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP +[Row(random_udf()=u'82')] +>>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP +[Row(random_udf()=u'62')] """ -udf = UserDefinedFunction(f, returnType=returnType, name=name, - evalType=PythonEvalType.SQL_BATCHED_UDF) + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +udf = UserDefinedFunction(f.func, returnType=returnType, name=name, +
spark git commit: [SPARK-22939][PYSPARK] Support Spark UDF in registerFunction
Repository: spark Updated Branches: refs/heads/master d5861aba9 -> 5aadbc929 [SPARK-22939][PYSPARK] Support Spark UDF in registerFunction ## What changes were proposed in this pull request? ```Python import random from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType, StringType random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() spark.catalog.registerFunction("random_udf", random_udf, StringType()) spark.sql("SELECT random_udf()").collect() ``` We will get the following error. ``` Py4JError: An error occurred while calling o29.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) ``` This PR is to support it. ## How was this patch tested? WIP Author: gatorsmileCloses #20137 from gatorsmile/registerFunction. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5aadbc92 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5aadbc92 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5aadbc92 Branch: refs/heads/master Commit: 5aadbc929cb194e06dbd3bab054a161569289af5 Parents: d5861ab Author: gatorsmile Authored: Thu Jan 4 21:07:31 2018 +0800 Committer: gatorsmile Committed: Thu Jan 4 21:07:31 2018 +0800 -- python/pyspark/sql/catalog.py | 27 + python/pyspark/sql/context.py | 16 ++--- python/pyspark/sql/tests.py | 49 +++--- python/pyspark/sql/udf.py | 21 ++-- 4 files changed, 84 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5aadbc92/python/pyspark/sql/catalog.py -- diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 659bc65..1566031 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -227,15 +227,15 @@ class Catalog(object): @ignore_unicode_prefix @since(2.0) def registerFunction(self, name, f, returnType=StringType()): -"""Registers a python function (including lambda function) as a UDF -so it can be used in SQL statements. +"""Registers a Python function (including lambda function) or a :class:`UserDefinedFunction` +as a UDF. The registered UDF can be used in SQL statement. In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically be done. For any other return type, the produced object must match the specified type. :param name: name of the UDF -:param f: python function +:param f: a Python function, or a wrapped/native UserDefinedFunction :param returnType: a :class:`pyspark.sql.types.DataType` object :return: a wrapped :class:`UserDefinedFunction` @@ -255,9 +255,26 @@ class Catalog(object): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] + +>>> import random +>>> from pyspark.sql.functions import udf +>>> from pyspark.sql.types import IntegerType, StringType +>>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() +>>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) +>>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP +[Row(random_udf()=u'82')] +>>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP +[Row(random_udf()=u'62')] """ -udf = UserDefinedFunction(f, returnType=returnType, name=name, - evalType=PythonEvalType.SQL_BATCHED_UDF) + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +udf = UserDefinedFunction(f.func, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF, + deterministic=f.deterministic) +else: +
svn commit: r24008 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_04_04_01-d5861ab-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 4 12:19:15 2018 New Revision: 24008 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_04_04_01-d5861ab docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22945][SQL] add java UDF APIs in the functions object
Repository: spark Updated Branches: refs/heads/branch-2.3 a7cfd6bea -> eb99b8ade [SPARK-22945][SQL] add java UDF APIs in the functions object ## What changes were proposed in this pull request? Currently Scala users can use UDF like ``` val foo = udf((i: Int) => Math.random() + i).asNondeterministic df.select(foo('a)) ``` Python users can also do it with similar APIs. However Java users can't do it, we should add Java UDF APIs in the functions object. ## How was this patch tested? new tests Author: Wenchen FanCloses #20141 from cloud-fan/udf. (cherry picked from commit d5861aba9d80ca15ad3f22793b79822e470d6913) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb99b8ad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb99b8ad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb99b8ad Branch: refs/heads/branch-2.3 Commit: eb99b8adecc050240ce9d5e0b92a20f018df465e Parents: a7cfd6b Author: Wenchen Fan Authored: Thu Jan 4 19:17:22 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 4 19:17:39 2018 +0800 -- .../org/apache/spark/sql/UDFRegistration.scala | 90 +++--- .../sql/expressions/UserDefinedFunction.scala | 1 + .../scala/org/apache/spark/sql/functions.scala | 313 +++ .../apache/spark/sql/JavaDataFrameSuite.java| 11 + .../scala/org/apache/spark/sql/UDFSuite.scala | 12 +- 5 files changed, 315 insertions(+), 112 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eb99b8ad/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index dc2468a..f94baef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.lang.reflect.{ParameterizedType, Type} +import java.lang.reflect.ParameterizedType import scala.reflect.runtime.universe.TypeTag import scala.util.Try @@ -110,29 +110,29 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends /* register 0-22 were generated by this script -(0 to 22).map { x => +(0 to 22).foreach { x => val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"}) - val typeTags = (1 to x).map(i => s"A${i}: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _) + val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _) val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor[A$i].dataType :: $s"}) println(s""" -/** - * Registers a deterministic Scala closure of ${x} arguments as user-defined function (UDF). - * @tparam RT return type of UDF. - * @since 1.3.0 - */ -def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = { - val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try($inputTypes).toOption - def builder(e: Seq[Expression]) = if (e.length == $x) { -ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true) - } else { - throw new AnalysisException("Invalid number of arguments for function " + name + - ". Expected: $x; Found: " + e.length) - } - functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) - if (nullable) udf else udf.asNonNullable() -}""") +|/** +| * Registers a deterministic Scala closure of $x arguments as user-defined function (UDF). +| * @tparam RT return type of UDF. +| * @since 1.3.0 +| */ +|def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = { +| val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] +| val inputTypes = Try($inputTypes).toOption +| def builder(e: Seq[Expression]) = if (e.length == $x) { +|ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true) +| } else { +|throw new AnalysisException("Invalid number of arguments for function " + name + +| ". Expected: $x; Found: " + e.length) +| } +|
spark git commit: [SPARK-22945][SQL] add java UDF APIs in the functions object
Repository: spark Updated Branches: refs/heads/master 9fa703e89 -> d5861aba9 [SPARK-22945][SQL] add java UDF APIs in the functions object ## What changes were proposed in this pull request? Currently Scala users can use UDF like ``` val foo = udf((i: Int) => Math.random() + i).asNondeterministic df.select(foo('a)) ``` Python users can also do it with similar APIs. However Java users can't do it, we should add Java UDF APIs in the functions object. ## How was this patch tested? new tests Author: Wenchen FanCloses #20141 from cloud-fan/udf. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5861aba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5861aba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5861aba Branch: refs/heads/master Commit: d5861aba9d80ca15ad3f22793b79822e470d6913 Parents: 9fa703e Author: Wenchen Fan Authored: Thu Jan 4 19:17:22 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 4 19:17:22 2018 +0800 -- .../org/apache/spark/sql/UDFRegistration.scala | 90 +++--- .../sql/expressions/UserDefinedFunction.scala | 1 + .../scala/org/apache/spark/sql/functions.scala | 313 +++ .../apache/spark/sql/JavaDataFrameSuite.java| 11 + .../scala/org/apache/spark/sql/UDFSuite.scala | 12 +- 5 files changed, 315 insertions(+), 112 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d5861aba/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index dc2468a..f94baef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.lang.reflect.{ParameterizedType, Type} +import java.lang.reflect.ParameterizedType import scala.reflect.runtime.universe.TypeTag import scala.util.Try @@ -110,29 +110,29 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends /* register 0-22 were generated by this script -(0 to 22).map { x => +(0 to 22).foreach { x => val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"}) - val typeTags = (1 to x).map(i => s"A${i}: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _) + val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _) val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor[A$i].dataType :: $s"}) println(s""" -/** - * Registers a deterministic Scala closure of ${x} arguments as user-defined function (UDF). - * @tparam RT return type of UDF. - * @since 1.3.0 - */ -def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = { - val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try($inputTypes).toOption - def builder(e: Seq[Expression]) = if (e.length == $x) { -ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true) - } else { - throw new AnalysisException("Invalid number of arguments for function " + name + - ". Expected: $x; Found: " + e.length) - } - functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) - if (nullable) udf else udf.asNonNullable() -}""") +|/** +| * Registers a deterministic Scala closure of $x arguments as user-defined function (UDF). +| * @tparam RT return type of UDF. +| * @since 1.3.0 +| */ +|def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = { +| val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] +| val inputTypes = Try($inputTypes).toOption +| def builder(e: Seq[Expression]) = if (e.length == $x) { +|ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true) +| } else { +|throw new AnalysisException("Invalid number of arguments for function " + name + +| ". Expected: $x; Found: " + e.length) +| } +| functionRegistry.createOrReplaceTempFunction(name, builder) +| val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) +| if
spark git commit: [SPARK-22950][SQL] Handle ChildFirstURLClassLoader's parent
Repository: spark Updated Branches: refs/heads/branch-2.3 1860a43e9 -> a7cfd6bea [SPARK-22950][SQL] Handle ChildFirstURLClassLoader's parent ## What changes were proposed in this pull request? ChildFirstClassLoader's parent is set to null, so we can't get jars from its parent. This will cause ClassNotFoundException during HiveClient initialization with builtin hive jars, where we may should use spark context loader instead. ## How was this patch tested? add new ut cc cloud-fan gatorsmile Author: Kent YaoCloses #20145 from yaooqinn/SPARK-22950. (cherry picked from commit 9fa703e89318922393bae03c0db4575f4f4b4c56) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7cfd6be Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7cfd6be Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7cfd6be Branch: refs/heads/branch-2.3 Commit: a7cfd6beaf35f79a744047a4a09714ef1da60293 Parents: 1860a43 Author: Kent Yao Authored: Thu Jan 4 19:10:10 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 4 19:10:36 2018 +0800 -- .../org/apache/spark/sql/hive/HiveUtils.scala | 4 +++- .../apache/spark/sql/hive/HiveUtilsSuite.scala | 20 2 files changed, 23 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a7cfd6be/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index c489690..c7717d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, WAREHOUSE_PATH} import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{ChildFirstURLClassLoader, Utils} private[spark] object HiveUtils extends Logging { @@ -312,6 +312,8 @@ private[spark] object HiveUtils extends Logging { // starting from the given classLoader. def allJars(classLoader: ClassLoader): Array[URL] = classLoader match { case null => Array.empty[URL] +case childFirst: ChildFirstURLClassLoader => + childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader) case urlClassLoader: URLClassLoader => urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent) case other => allJars(other.getParent) http://git-wip-us.apache.org/repos/asf/spark/blob/a7cfd6be/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index fdbfcf1..8697d47 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -17,11 +17,16 @@ package org.apache.spark.sql.hive +import java.net.URL + import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader} class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -42,4 +47,19 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton assert(hiveConf("foo") === "bar") } } + + test("ChildFirstURLClassLoader's parent is null, get spark classloader instead") { +val conf = new SparkConf +val contextClassLoader = Thread.currentThread().getContextClassLoader +val loader = new ChildFirstURLClassLoader(Array(), contextClassLoader) +try { + Thread.currentThread().setContextClassLoader(loader) + HiveUtils.newClientForMetadata( +conf, +SparkHadoopUtil.newConfiguration(conf), +HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)) +} finally { + Thread.currentThread().setContextClassLoader(contextClassLoader) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For
spark git commit: [SPARK-22950][SQL] Handle ChildFirstURLClassLoader's parent
Repository: spark Updated Branches: refs/heads/master df95a908b -> 9fa703e89 [SPARK-22950][SQL] Handle ChildFirstURLClassLoader's parent ## What changes were proposed in this pull request? ChildFirstClassLoader's parent is set to null, so we can't get jars from its parent. This will cause ClassNotFoundException during HiveClient initialization with builtin hive jars, where we may should use spark context loader instead. ## How was this patch tested? add new ut cc cloud-fan gatorsmile Author: Kent YaoCloses #20145 from yaooqinn/SPARK-22950. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fa703e8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fa703e8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fa703e8 Branch: refs/heads/master Commit: 9fa703e89318922393bae03c0db4575f4f4b4c56 Parents: df95a90 Author: Kent Yao Authored: Thu Jan 4 19:10:10 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 4 19:10:10 2018 +0800 -- .../org/apache/spark/sql/hive/HiveUtils.scala | 4 +++- .../apache/spark/sql/hive/HiveUtilsSuite.scala | 20 2 files changed, 23 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9fa703e8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index c489690..c7717d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, WAREHOUSE_PATH} import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{ChildFirstURLClassLoader, Utils} private[spark] object HiveUtils extends Logging { @@ -312,6 +312,8 @@ private[spark] object HiveUtils extends Logging { // starting from the given classLoader. def allJars(classLoader: ClassLoader): Array[URL] = classLoader match { case null => Array.empty[URL] +case childFirst: ChildFirstURLClassLoader => + childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader) case urlClassLoader: URLClassLoader => urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent) case other => allJars(other.getParent) http://git-wip-us.apache.org/repos/asf/spark/blob/9fa703e8/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index fdbfcf1..8697d47 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -17,11 +17,16 @@ package org.apache.spark.sql.hive +import java.net.URL + import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader} class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -42,4 +47,19 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton assert(hiveConf("foo") === "bar") } } + + test("ChildFirstURLClassLoader's parent is null, get spark classloader instead") { +val conf = new SparkConf +val contextClassLoader = Thread.currentThread().getContextClassLoader +val loader = new ChildFirstURLClassLoader(Array(), contextClassLoader) +try { + Thread.currentThread().setContextClassLoader(loader) + HiveUtils.newClientForMetadata( +conf, +SparkHadoopUtil.newConfiguration(conf), +HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)) +} finally { + Thread.currentThread().setContextClassLoader(contextClassLoader) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24006 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_04_00_01-df95a90-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 4 08:19:58 2018 New Revision: 24006 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_04_00_01-df95a90 docs [This commit notification would consist of 1439 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org