svn commit: r24148 - in /dev/spark/v2.3.0-rc1-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _site/api/java/org/apache/spark
Author: sameerag Date: Fri Jan 12 07:52:52 2018 New Revision: 24148 Log: Apache Spark v2.3.0-rc1 docs [This commit notification would consist of 1430 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of broadcast variable values
Repository: spark Updated Branches: refs/heads/branch-2.3 55695c712 -> 3ae3e1bb7 [SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of broadcast variable values When resources happen to be constrained on an executor the first time a broadcast variable is instantiated it is persisted to disk by the BlockManager. Consequently, every subsequent call to TorrentBroadcast::readBroadcastBlock from other instances of that broadcast variable spawns another instance of the underlying value. That is, broadcast variables are spawned once per executor **unless** memory is constrained, in which case every instance of a broadcast variable is provided with a unique copy of the underlying value. This patch fixes the above by explicitly caching the underlying values using weak references in a ReferenceMap. Author: ho3rexqjCloses #20183 from ho3rexqj/fix/cache-broadcast-values. (cherry picked from commit cbe7c6fbf9dc2fc422b93b3644c40d449a869eea) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ae3e1bb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ae3e1bb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ae3e1bb Branch: refs/heads/branch-2.3 Commit: 3ae3e1bb71aa88be1c963b4416986ef679d7c8a2 Parents: 55695c7 Author: ho3rexqj Authored: Fri Jan 12 15:27:00 2018 +0800 Committer: Wenchen Fan Committed: Fri Jan 12 15:27:31 2018 +0800 -- .../spark/broadcast/BroadcastManager.scala | 6 ++ .../spark/broadcast/TorrentBroadcast.scala | 72 .../apache/spark/broadcast/BroadcastSuite.scala | 34 + 3 files changed, 83 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3ae3e1bb/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index e88988f..8d7a4a3 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.reflect.ClassTag +import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap} + import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) + } + def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = { broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement()) } http://git-wip-us.apache.org/repos/asf/spark/blob/3ae3e1bb/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 7aecd3c..e125095 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -
spark git commit: [SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of broadcast variable values
Repository: spark Updated Branches: refs/heads/master b5042d75c -> cbe7c6fbf [SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of broadcast variable values When resources happen to be constrained on an executor the first time a broadcast variable is instantiated it is persisted to disk by the BlockManager. Consequently, every subsequent call to TorrentBroadcast::readBroadcastBlock from other instances of that broadcast variable spawns another instance of the underlying value. That is, broadcast variables are spawned once per executor **unless** memory is constrained, in which case every instance of a broadcast variable is provided with a unique copy of the underlying value. This patch fixes the above by explicitly caching the underlying values using weak references in a ReferenceMap. Author: ho3rexqjCloses #20183 from ho3rexqj/fix/cache-broadcast-values. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbe7c6fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbe7c6fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbe7c6fb Branch: refs/heads/master Commit: cbe7c6fbf9dc2fc422b93b3644c40d449a869eea Parents: b5042d7 Author: ho3rexqj Authored: Fri Jan 12 15:27:00 2018 +0800 Committer: Wenchen Fan Committed: Fri Jan 12 15:27:00 2018 +0800 -- .../spark/broadcast/BroadcastManager.scala | 6 ++ .../spark/broadcast/TorrentBroadcast.scala | 72 .../apache/spark/broadcast/BroadcastSuite.scala | 34 + 3 files changed, 83 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cbe7c6fb/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index e88988f..8d7a4a3 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.reflect.ClassTag +import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap} + import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) + } + def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = { broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement()) } http://git-wip-us.apache.org/repos/asf/spark/blob/cbe7c6fb/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala -- diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 7aecd3c..e125095 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel,
svn commit: r24147 - /dev/spark/v2.3.0-rc1-bin/
Author: sameerag Date: Fri Jan 12 07:25:00 2018 New Revision: 24147 Log: Apache Spark v2.3.0-rc1 Added: dev/spark/v2.3.0-rc1-bin/ dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz (with props) dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.asc dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.md5 dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.sha512 dev/spark/v2.3.0-rc1-bin/pyspark-2.3.0.tar.gz (with props) dev/spark/v2.3.0-rc1-bin/pyspark-2.3.0.tar.gz.asc dev/spark/v2.3.0-rc1-bin/pyspark-2.3.0.tar.gz.md5 dev/spark/v2.3.0-rc1-bin/pyspark-2.3.0.tar.gz.sha512 dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-hadoop2.6.tgz (with props) dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-hadoop2.6.tgz.asc dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-hadoop2.6.tgz.md5 dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-hadoop2.6.tgz.sha512 dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-hadoop2.7.tgz (with props) dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-hadoop2.7.tgz.asc dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-hadoop2.7.tgz.md5 dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-hadoop2.7.tgz.sha512 dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-without-hadoop.tgz (with props) dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-without-hadoop.tgz.asc dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-without-hadoop.tgz.md5 dev/spark/v2.3.0-rc1-bin/spark-2.3.0-bin-without-hadoop.tgz.sha512 dev/spark/v2.3.0-rc1-bin/spark-2.3.0.tgz (with props) dev/spark/v2.3.0-rc1-bin/spark-2.3.0.tgz.asc dev/spark/v2.3.0-rc1-bin/spark-2.3.0.tgz.md5 dev/spark/v2.3.0-rc1-bin/spark-2.3.0.tgz.sha512 dev/spark/v2.3.0-rc1-bin/spark-parent_2.11.iml Added: dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz == Binary file - no diff available. Propchange: dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz -- svn:mime-type = application/octet-stream Added: dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.asc == --- dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.asc (added) +++ dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.asc Fri Jan 12 07:25:00 2018 @@ -0,0 +1,11 @@ +-BEGIN PGP SIGNATURE- + +iQEzBAABCAAdFiEE+nV7jWSrvCH8Arwcoc7bqK0MAioFAlpYYbMACgkQoc7bqK0M +AirvPAf9Gsj1RKJiy01H4T7QJkTReJ0/0Qz3EBzqa2+7xC5AH+MZ1eH3RLBn3Rws +UgXNQChjCcx0r5dYRYQfa2FHLUHKPxI4Ax6As9mrtW4D0iLuWhZ50Wjn44rHVjQs +Vud4iclkvtBNe+qWW86ipLDz7U/2AInfmb8F2wwFih//5vuJNvSvc3biTR4dJos/ +2AIjOis/Rx05G+kULHQSrC25mXtJWEBqxBpOITuYii0x8S2e0LbD0zg2voTN8oVM +PoQ8s6UYN5/QEih180bmLvw9GgdT+e39xqiin3vohCXGS7AboSNCLoKGCmhmKhCa +M8PvdHlk4ffuJNYhbHV4/bhftAgdaw== +=d5v6 +-END PGP SIGNATURE- Added: dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.md5 == --- dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.md5 (added) +++ dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.md5 Fri Jan 12 07:25:00 2018 @@ -0,0 +1 @@ +SparkR_2.3.0.tar.gz: 19 43 B2 0C E9 07 3C 93 7C 92 D9 DD 47 F5 50 1B Added: dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.sha512 == --- dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.sha512 (added) +++ dev/spark/v2.3.0-rc1-bin/SparkR_2.3.0.tar.gz.sha512 Fri Jan 12 07:25:00 2018 @@ -0,0 +1,3 @@ +SparkR_2.3.0.tar.gz: 2F303B2B A379A3AC 445B1D8F CFC76985 4DA7116B F2640E7D + 001B78BF F309FE5B 89799209 872F3051 D097F8EE EEF8A77D + 753BDB0A 2BA7D95E CAD7D01D 4EA8FF39 Added: dev/spark/v2.3.0-rc1-bin/pyspark-2.3.0.tar.gz == Binary file - no diff available. Propchange: dev/spark/v2.3.0-rc1-bin/pyspark-2.3.0.tar.gz -- svn:mime-type = application/octet-stream Added: dev/spark/v2.3.0-rc1-bin/pyspark-2.3.0.tar.gz.asc == --- dev/spark/v2.3.0-rc1-bin/pyspark-2.3.0.tar.gz.asc (added) +++ dev/spark/v2.3.0-rc1-bin/pyspark-2.3.0.tar.gz.asc Fri Jan 12 07:25:00 2018 @@ -0,0 +1,11 @@ +-BEGIN PGP SIGNATURE- + +iQEzBAABCAAdFiEE+nV7jWSrvCH8Arwcoc7bqK0MAioFAlpYYRkACgkQoc7bqK0M +AiqaDwf/ZHZmj9SDqcd9Lh+jqqusa+l9kspNKQSbxxOSzX+6TSz3bqMap2UMrpva +BG8Mf42HwMVsuRLuHFFTpHdcHkWSWdAvU4/N2Zo/cfsYBhQ/mJPYlKVVuSTaAJ2t +//86APZxXDMJlPtvgtgwlixChuunNuGN7B5fQ+0ANLIZvD18hs1ppOY2Yth8jA43 +yifmDrj3tZ6IRJGY4XVx4pyPRTB8pHuJn+U/U2XRvUNN+eL7epb02A4tivyS3lH9 +idDAa8d1rjZKpPXuiQ0lFOnUg/sQHaqCoBqHGzjfqV3H2uPUbQkBxP3074fRNjBp ++Fynj4rlA/Zn2+LwOQ82Cmp9okVl4Q== +=BJkd +-END PGP SIGNATURE- Added: dev/spark/v2.3.0-rc1-bin/pyspark-2.3.0.tar.gz.md5 == ---
svn commit: r24145 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_11_20_01-b5042d7-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Jan 12 04:15:08 2018 New Revision: 24145 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_11_20_01-b5042d7 docs [This commit notification would consist of 1440 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24143 - in /dev/spark/2.3.1-SNAPSHOT-2018_01_11_18_01-55695c7-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Jan 12 02:15:01 2018 New Revision: 24143 Log: Apache Spark 2.3.1-SNAPSHOT-2018_01_11_18_01-55695c7 docs [This commit notification would consist of 1440 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23008][ML] OnehotEncoderEstimator python API
Repository: spark Updated Branches: refs/heads/branch-2.3 6bb22961c -> 55695c712 [SPARK-23008][ML] OnehotEncoderEstimator python API ## What changes were proposed in this pull request? OnehotEncoderEstimator python API. ## How was this patch tested? doctest Author: WeichenXuCloses #20209 from WeichenXu123/ohe_py. (cherry picked from commit b5042d75c2faa5f15bc1e160d75f06dfdd6eea37) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55695c71 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55695c71 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55695c71 Branch: refs/heads/branch-2.3 Commit: 55695c7127cb2f357dfdf677cab4d21fc840aa3d Parents: 6bb2296 Author: WeichenXu Authored: Thu Jan 11 16:20:30 2018 -0800 Committer: Joseph K. Bradley Committed: Thu Jan 11 16:20:41 2018 -0800 -- python/pyspark/ml/feature.py| 113 +++ .../pyspark/ml/param/_shared_params_code_gen.py | 1 + python/pyspark/ml/param/shared.py | 23 3 files changed, 137 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/55695c71/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 13bf95c..b963e45 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -45,6 +45,7 @@ __all__ = ['Binarizer', 'NGram', 'Normalizer', 'OneHotEncoder', + 'OneHotEncoderEstimator', 'OneHotEncoderModel', 'PCA', 'PCAModel', 'PolynomialExpansion', 'QuantileDiscretizer', @@ -1642,6 +1643,118 @@ class OneHotEncoder(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, @inherit_doc +class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHandleInvalid, + JavaMLReadable, JavaMLWritable): +""" +A one-hot encoder that maps a column of category indices to a column of binary vectors, with +at most a single one-value per row that indicates the input category index. +For example with 5 categories, an input value of 2.0 would map to an output vector of +`[0.0, 0.0, 1.0, 0.0]`. +The last category is not included by default (configurable via `dropLast`), +because it makes the vector entries sum up to one, and hence linearly dependent. +So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`. + +Note: This is different from scikit-learn's OneHotEncoder, which keeps all categories. +The output vectors are sparse. + +When `handleInvalid` is configured to 'keep', an extra "category" indicating invalid values is +added as last category. So when `dropLast` is true, invalid values are encoded as all-zeros +vector. + +Note: When encoding multi-column by using `inputCols` and `outputCols` params, input/output +cols come in pairs, specified by the order in the arrays, and each pair is treated +independently. + +See `StringIndexer` for converting categorical values into category indices + +>>> from pyspark.ml.linalg import Vectors +>>> df = spark.createDataFrame([(0.0,), (1.0,), (2.0,)], ["input"]) +>>> ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"]) +>>> model = ohe.fit(df) +>>> model.transform(df).head().output +SparseVector(2, {0: 1.0}) +>>> ohePath = temp_path + "/oheEstimator" +>>> ohe.save(ohePath) +>>> loadedOHE = OneHotEncoderEstimator.load(ohePath) +>>> loadedOHE.getInputCols() == ohe.getInputCols() +True +>>> modelPath = temp_path + "/ohe-model" +>>> model.save(modelPath) +>>> loadedModel = OneHotEncoderModel.load(modelPath) +>>> loadedModel.categorySizes == model.categorySizes +True + +.. versionadded:: 2.3.0 +""" + +handleInvalid = Param(Params._dummy(), "handleInvalid", "How to handle invalid data during " + + "transform(). Options are 'keep' (invalid data presented as an extra " + + "categorical feature) or error (throw an error). Note that this Param " + + "is only used during transform; during fitting, invalid data will " + + "result in an error.", + typeConverter=TypeConverters.toString) + +dropLast = Param(Params._dummy(), "dropLast", "whether to drop the last category", + typeConverter=TypeConverters.toBoolean) + +@keyword_only +def __init__(self, inputCols=None, outputCols=None, handleInvalid="error",
spark git commit: [SPARK-23008][ML] OnehotEncoderEstimator python API
Repository: spark Updated Branches: refs/heads/master 186bf8fb2 -> b5042d75c [SPARK-23008][ML] OnehotEncoderEstimator python API ## What changes were proposed in this pull request? OnehotEncoderEstimator python API. ## How was this patch tested? doctest Author: WeichenXuCloses #20209 from WeichenXu123/ohe_py. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b5042d75 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b5042d75 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b5042d75 Branch: refs/heads/master Commit: b5042d75c2faa5f15bc1e160d75f06dfdd6eea37 Parents: 186bf8f Author: WeichenXu Authored: Thu Jan 11 16:20:30 2018 -0800 Committer: Joseph K. Bradley Committed: Thu Jan 11 16:20:30 2018 -0800 -- python/pyspark/ml/feature.py| 113 +++ .../pyspark/ml/param/_shared_params_code_gen.py | 1 + python/pyspark/ml/param/shared.py | 23 3 files changed, 137 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b5042d75/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 13bf95c..b963e45 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -45,6 +45,7 @@ __all__ = ['Binarizer', 'NGram', 'Normalizer', 'OneHotEncoder', + 'OneHotEncoderEstimator', 'OneHotEncoderModel', 'PCA', 'PCAModel', 'PolynomialExpansion', 'QuantileDiscretizer', @@ -1642,6 +1643,118 @@ class OneHotEncoder(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, @inherit_doc +class OneHotEncoderEstimator(JavaEstimator, HasInputCols, HasOutputCols, HasHandleInvalid, + JavaMLReadable, JavaMLWritable): +""" +A one-hot encoder that maps a column of category indices to a column of binary vectors, with +at most a single one-value per row that indicates the input category index. +For example with 5 categories, an input value of 2.0 would map to an output vector of +`[0.0, 0.0, 1.0, 0.0]`. +The last category is not included by default (configurable via `dropLast`), +because it makes the vector entries sum up to one, and hence linearly dependent. +So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`. + +Note: This is different from scikit-learn's OneHotEncoder, which keeps all categories. +The output vectors are sparse. + +When `handleInvalid` is configured to 'keep', an extra "category" indicating invalid values is +added as last category. So when `dropLast` is true, invalid values are encoded as all-zeros +vector. + +Note: When encoding multi-column by using `inputCols` and `outputCols` params, input/output +cols come in pairs, specified by the order in the arrays, and each pair is treated +independently. + +See `StringIndexer` for converting categorical values into category indices + +>>> from pyspark.ml.linalg import Vectors +>>> df = spark.createDataFrame([(0.0,), (1.0,), (2.0,)], ["input"]) +>>> ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"]) +>>> model = ohe.fit(df) +>>> model.transform(df).head().output +SparseVector(2, {0: 1.0}) +>>> ohePath = temp_path + "/oheEstimator" +>>> ohe.save(ohePath) +>>> loadedOHE = OneHotEncoderEstimator.load(ohePath) +>>> loadedOHE.getInputCols() == ohe.getInputCols() +True +>>> modelPath = temp_path + "/ohe-model" +>>> model.save(modelPath) +>>> loadedModel = OneHotEncoderModel.load(modelPath) +>>> loadedModel.categorySizes == model.categorySizes +True + +.. versionadded:: 2.3.0 +""" + +handleInvalid = Param(Params._dummy(), "handleInvalid", "How to handle invalid data during " + + "transform(). Options are 'keep' (invalid data presented as an extra " + + "categorical feature) or error (throw an error). Note that this Param " + + "is only used during transform; during fitting, invalid data will " + + "result in an error.", + typeConverter=TypeConverters.toString) + +dropLast = Param(Params._dummy(), "dropLast", "whether to drop the last category", + typeConverter=TypeConverters.toBoolean) + +@keyword_only +def __init__(self, inputCols=None, outputCols=None, handleInvalid="error", dropLast=True): +""" +__init__(self, inputCols=None, outputCols=None, handleInvalid="error", dropLast=True) +""" +
svn commit: r24140 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_11_16_01-186bf8f-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Jan 12 00:14:59 2018 New Revision: 24140 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_11_16_01-186bf8f docs [This commit notification would consist of 1440 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing development version 2.3.1-SNAPSHOT
Preparing development version 2.3.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6bb22961 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6bb22961 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6bb22961 Branch: refs/heads/branch-2.3 Commit: 6bb22961c0c9df1a1f22e9491894895b297f5288 Parents: 964cc2e Author: Sameer AgarwalAuthored: Thu Jan 11 15:23:17 2018 -0800 Committer: Sameer Agarwal Committed: Thu Jan 11 15:23:17 2018 -0800 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/kvstore/pom.xml| 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- hadoop-cloud/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- resource-managers/kubernetes/core/pom.xml | 2 +- resource-managers/mesos/pom.xml | 2 +- resource-managers/yarn/pom.xml| 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 41 files changed, 42 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6bb22961/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 6d46c31..29a8a00 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,6 +1,6 @@ Package: SparkR Type: Package -Version: 2.3.0 +Version: 2.3.1 Title: R Frontend for Apache Spark Description: Provides an R Frontend for Apache Spark. Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), http://git-wip-us.apache.org/repos/asf/spark/blob/6bb22961/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 2ca9ab6..5c5a8e9 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.3.0 +2.3.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/6bb22961/common/kvstore/pom.xml -- diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index 404c744..2a625da 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.3.0 +2.3.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/6bb22961/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 3c0b528..adb1890 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.3.0 +2.3.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/6bb22961/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index fe3bcfd..4cdcfa2 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@
[1/2] spark git commit: Preparing Spark release v2.3.0-rc1
Repository: spark Updated Branches: refs/heads/branch-2.3 2ec302658 -> 6bb22961c Preparing Spark release v2.3.0-rc1 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/964cc2e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/964cc2e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/964cc2e3 Branch: refs/heads/branch-2.3 Commit: 964cc2e31b2862bca0bd968b3e9e2cbf8d3ba5ea Parents: 2ec3026 Author: Sameer AgarwalAuthored: Thu Jan 11 15:23:10 2018 -0800 Committer: Sameer Agarwal Committed: Thu Jan 11 15:23:10 2018 -0800 -- assembly/pom.xml | 2 +- common/kvstore/pom.xml| 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 2 +- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- hadoop-cloud/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- resource-managers/kubernetes/core/pom.xml | 2 +- resource-managers/mesos/pom.xml | 2 +- resource-managers/yarn/pom.xml| 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 40 files changed, 40 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/964cc2e3/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index b3b4239..2ca9ab6 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.3.0-SNAPSHOT +2.3.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/964cc2e3/common/kvstore/pom.xml -- diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index cf93d41..404c744 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.3.0-SNAPSHOT +2.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/964cc2e3/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 18cbdad..3c0b528 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.3.0-SNAPSHOT +2.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/964cc2e3/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 9968480..fe3bcfd 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.3.0-SNAPSHOT +2.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/964cc2e3/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index ec2db6e..90ca401 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.3.0-rc1 [created] 964cc2e31 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24138 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_11_14_01-2ec3026-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 11 22:15:09 2018 New Revision: 24138 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_11_14_01-2ec3026 docs [This commit notification would consist of 1440 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline
Repository: spark Updated Branches: refs/heads/branch-2.3 f891ee324 -> 2ec302658 [SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline ## What changes were proposed in this pull request? Including VectorSizeHint in RFormula piplelines will allow them to be applied to streaming dataframes. ## How was this patch tested? Unit tests. Author: Bago AmirbekianCloses #20238 from MrBago/rFormulaVectorSize. (cherry picked from commit 186bf8fb2e9ff8a80f3f6bcb5f2a0327fa79a1c9) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ec30265 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ec30265 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ec30265 Branch: refs/heads/branch-2.3 Commit: 2ec302658c98038962c9b7a90fd2cff751a35ffa Parents: f891ee3 Author: Bago Amirbekian Authored: Thu Jan 11 13:57:15 2018 -0800 Committer: Joseph K. Bradley Committed: Thu Jan 11 13:57:27 2018 -0800 -- R/pkg/R/mllib_utils.R | 1 + .../org/apache/spark/ml/feature/RFormula.scala | 18 -- .../apache/spark/ml/feature/RFormulaSuite.scala | 37 +--- 3 files changed, 48 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ec30265/R/pkg/R/mllib_utils.R -- diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R index a53c92c..23dda42 100644 --- a/R/pkg/R/mllib_utils.R +++ b/R/pkg/R/mllib_utils.R @@ -130,3 +130,4 @@ read.ml <- function(path) { stop("Unsupported model: ", jobj) } } + http://git-wip-us.apache.org/repos/asf/spark/blob/2ec30265/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index 7da3339..f384ffb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model, Pipeline, PipelineModel, PipelineStage, Transformer} import org.apache.spark.ml.attribute.AttributeGroup -import org.apache.spark.ml.linalg.VectorUDT +import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasHandleInvalid, HasLabelCol} import org.apache.spark.ml.util._ @@ -210,8 +210,8 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) // First we index each string column referenced by the input terms. val indexed: Map[String, String] = resolvedFormula.terms.flatten.distinct.map { term => - dataset.schema(term) match { -case column if column.dataType == StringType => + dataset.schema(term).dataType match { +case _: StringType => val indexCol = tmpColumn("stridx") encoderStages += new StringIndexer() .setInputCol(term) @@ -220,6 +220,18 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) .setHandleInvalid($(handleInvalid)) prefixesToRewrite(indexCol + "_") = term + "_" (term, indexCol) +case _: VectorUDT => + val group = AttributeGroup.fromStructField(dataset.schema(term)) + val size = if (group.size < 0) { +dataset.select(term).first().getAs[Vector](0).size + } else { +group.size + } + encoderStages += new VectorSizeHint(uid) +.setHandleInvalid("optimistic") +.setInputCol(term) +.setSize(size) + (term, term) case _ => (term, term) } http://git-wip-us.apache.org/repos/asf/spark/blob/2ec30265/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala index 5d09c90..f3f4b5a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala @@ -17,15 +17,15 @@ package org.apache.spark.ml.feature -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.SparkException import
spark git commit: [SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline
Repository: spark Updated Branches: refs/heads/master 6f7aaed80 -> 186bf8fb2 [SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline ## What changes were proposed in this pull request? Including VectorSizeHint in RFormula piplelines will allow them to be applied to streaming dataframes. ## How was this patch tested? Unit tests. Author: Bago AmirbekianCloses #20238 from MrBago/rFormulaVectorSize. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/186bf8fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/186bf8fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/186bf8fb Branch: refs/heads/master Commit: 186bf8fb2e9ff8a80f3f6bcb5f2a0327fa79a1c9 Parents: 6f7aaed Author: Bago Amirbekian Authored: Thu Jan 11 13:57:15 2018 -0800 Committer: Joseph K. Bradley Committed: Thu Jan 11 13:57:15 2018 -0800 -- R/pkg/R/mllib_utils.R | 1 + .../org/apache/spark/ml/feature/RFormula.scala | 18 -- .../apache/spark/ml/feature/RFormulaSuite.scala | 37 +--- 3 files changed, 48 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/186bf8fb/R/pkg/R/mllib_utils.R -- diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R index a53c92c..23dda42 100644 --- a/R/pkg/R/mllib_utils.R +++ b/R/pkg/R/mllib_utils.R @@ -130,3 +130,4 @@ read.ml <- function(path) { stop("Unsupported model: ", jobj) } } + http://git-wip-us.apache.org/repos/asf/spark/blob/186bf8fb/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index 7da3339..f384ffb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model, Pipeline, PipelineModel, PipelineStage, Transformer} import org.apache.spark.ml.attribute.AttributeGroup -import org.apache.spark.ml.linalg.VectorUDT +import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasHandleInvalid, HasLabelCol} import org.apache.spark.ml.util._ @@ -210,8 +210,8 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) // First we index each string column referenced by the input terms. val indexed: Map[String, String] = resolvedFormula.terms.flatten.distinct.map { term => - dataset.schema(term) match { -case column if column.dataType == StringType => + dataset.schema(term).dataType match { +case _: StringType => val indexCol = tmpColumn("stridx") encoderStages += new StringIndexer() .setInputCol(term) @@ -220,6 +220,18 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) .setHandleInvalid($(handleInvalid)) prefixesToRewrite(indexCol + "_") = term + "_" (term, indexCol) +case _: VectorUDT => + val group = AttributeGroup.fromStructField(dataset.schema(term)) + val size = if (group.size < 0) { +dataset.select(term).first().getAs[Vector](0).size + } else { +group.size + } + encoderStages += new VectorSizeHint(uid) +.setHandleInvalid("optimistic") +.setInputCol(term) +.setSize(size) + (term, term) case _ => (term, term) } http://git-wip-us.apache.org/repos/asf/spark/blob/186bf8fb/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala index 5d09c90..f3f4b5a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala @@ -17,15 +17,15 @@ package org.apache.spark.ml.feature -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.SparkException import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.linalg.{Vector, Vectors} import
svn commit: r24135 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_11_12_01-6f7aaed-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 11 20:15:08 2018 New Revision: 24135 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_11_12_01-6f7aaed docs [This commit notification would consist of 1440 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [SPARK-22908] Add kafka source and sink for continuous processing.
Repository: spark Updated Branches: refs/heads/branch-2.3 b94debd2b -> f891ee324 http://git-wip-us.apache.org/repos/asf/spark/blob/f891ee32/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3304f36..97f12ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -255,17 +255,24 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } } -case _ => throw new AnalysisException(s"$cls does not support data writing.") +// Streaming also uses the data source V2 API. So it may be that the data source implements +// v2, but has no v2 implementation for batch writes. In that case, we fall back to saving +// as though it's a V1 source. +case _ => saveToV1Source() } } else { - // Code path for data source v1. - runCommand(df.sparkSession, "save") { -DataSource( - sparkSession = df.sparkSession, - className = source, - partitionColumns = partitioningColumns.getOrElse(Nil), - options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) - } + saveToV1Source() +} + } + + private def saveToV1Source(): Unit = { +// Code path for data source v1. +runCommand(df.sparkSession, "save") { + DataSource( +sparkSession = df.sparkSession, +className = source, +partitionColumns = partitioningColumns.getOrElse(Nil), +options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/f891ee32/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index f0bdf84..a4a857f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -81,9 +81,11 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan) (index, message: WriterCommitMessage) => messages(index) = message ) - logInfo(s"Data source writer $writer is committing.") - writer.commit(messages) - logInfo(s"Data source writer $writer committed.") + if (!writer.isInstanceOf[ContinuousWriter]) { +logInfo(s"Data source writer $writer is committing.") +writer.commit(messages) +logInfo(s"Data source writer $writer committed.") + } } catch { case _: InterruptedException if writer.isInstanceOf[ContinuousWriter] => // Interruption is how continuous queries are ended, so accept and ignore the exception. http://git-wip-us.apache.org/repos/asf/spark/blob/f891ee32/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 24a8b00..cf27e1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -142,7 +142,8 @@ abstract class StreamExecution( override val id: UUID = UUID.fromString(streamMetadata.id) - override val runId: UUID = UUID.randomUUID + override def runId: UUID = currentRunId + protected var currentRunId = UUID.randomUUID /** * Pretty identified string of printing in logs. Format is @@ -418,11 +419,17 @@ abstract class StreamExecution( * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is intended for use primarily when writing tests. */ - private[sql] def awaitOffset(source: BaseStreamingSource, newOffset: Offset): Unit = { + private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit = { assertAwaitThread() def notDone = { val localCommittedOffsets = committedOffsets - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset + if (sources == null) { +// sources might not be initialized yet +false + } else { +
[2/2] spark git commit: [SPARK-22908] Add kafka source and sink for continuous processing.
[SPARK-22908] Add kafka source and sink for continuous processing. ## What changes were proposed in this pull request? Add kafka source and sink for continuous processing. This involves two small changes to the execution engine: * Bring data reader close() into the normal data reader thread to avoid thread safety issues. * Fix up the semantics of the RECONFIGURING StreamExecution state. State updates are now atomic, and we don't have to deal with swallowing an exception. ## How was this patch tested? new unit tests Author: Jose TorresCloses #20096 from jose-torres/continuous-kafka. (cherry picked from commit 6f7aaed805070d29dcba32e04ca7a1f581fa54b9) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f891ee32 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f891ee32 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f891ee32 Branch: refs/heads/branch-2.3 Commit: f891ee3249e04576dd579cbab6f8f1632550e6bd Parents: b94debd Author: Jose Torres Authored: Thu Jan 11 10:52:12 2018 -0800 Committer: Tathagata Das Committed: Thu Jan 11 10:52:26 2018 -0800 -- .../sql/kafka010/KafkaContinuousReader.scala| 232 + .../sql/kafka010/KafkaContinuousWriter.scala| 119 + .../spark/sql/kafka010/KafkaOffsetReader.scala | 21 +- .../apache/spark/sql/kafka010/KafkaSource.scala | 17 +- .../spark/sql/kafka010/KafkaSourceOffset.scala | 7 +- .../sql/kafka010/KafkaSourceProvider.scala | 105 +++- .../spark/sql/kafka010/KafkaWriteTask.scala | 71 +-- .../apache/spark/sql/kafka010/KafkaWriter.scala | 5 +- .../sql/kafka010/KafkaContinuousSinkSuite.scala | 474 +++ .../kafka010/KafkaContinuousSourceSuite.scala | 96 .../sql/kafka010/KafkaContinuousTest.scala | 64 +++ .../spark/sql/kafka010/KafkaSourceSuite.scala | 470 +- .../org/apache/spark/sql/DataFrameReader.scala | 32 +- .../org/apache/spark/sql/DataFrameWriter.scala | 25 +- .../datasources/v2/WriteToDataSourceV2.scala| 8 +- .../execution/streaming/StreamExecution.scala | 15 +- .../ContinuousDataSourceRDDIter.scala | 3 +- .../continuous/ContinuousExecution.scala| 67 +-- .../streaming/continuous/EpochCoordinator.scala | 21 +- .../spark/sql/streaming/DataStreamWriter.scala | 26 +- .../apache/spark/sql/streaming/StreamTest.scala | 36 +- 21 files changed, 1531 insertions(+), 383 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f891ee32/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala new file mode 100644 index 000..9283795 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import
[2/2] spark git commit: [SPARK-22908] Add kafka source and sink for continuous processing.
[SPARK-22908] Add kafka source and sink for continuous processing. ## What changes were proposed in this pull request? Add kafka source and sink for continuous processing. This involves two small changes to the execution engine: * Bring data reader close() into the normal data reader thread to avoid thread safety issues. * Fix up the semantics of the RECONFIGURING StreamExecution state. State updates are now atomic, and we don't have to deal with swallowing an exception. ## How was this patch tested? new unit tests Author: Jose TorresCloses #20096 from jose-torres/continuous-kafka. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f7aaed8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f7aaed8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f7aaed8 Branch: refs/heads/master Commit: 6f7aaed805070d29dcba32e04ca7a1f581fa54b9 Parents: 0b2eefb Author: Jose Torres Authored: Thu Jan 11 10:52:12 2018 -0800 Committer: Tathagata Das Committed: Thu Jan 11 10:52:12 2018 -0800 -- .../sql/kafka010/KafkaContinuousReader.scala| 232 + .../sql/kafka010/KafkaContinuousWriter.scala| 119 + .../spark/sql/kafka010/KafkaOffsetReader.scala | 21 +- .../apache/spark/sql/kafka010/KafkaSource.scala | 17 +- .../spark/sql/kafka010/KafkaSourceOffset.scala | 7 +- .../sql/kafka010/KafkaSourceProvider.scala | 105 +++- .../spark/sql/kafka010/KafkaWriteTask.scala | 71 +-- .../apache/spark/sql/kafka010/KafkaWriter.scala | 5 +- .../sql/kafka010/KafkaContinuousSinkSuite.scala | 474 +++ .../kafka010/KafkaContinuousSourceSuite.scala | 96 .../sql/kafka010/KafkaContinuousTest.scala | 64 +++ .../spark/sql/kafka010/KafkaSourceSuite.scala | 470 +- .../org/apache/spark/sql/DataFrameReader.scala | 32 +- .../org/apache/spark/sql/DataFrameWriter.scala | 25 +- .../datasources/v2/WriteToDataSourceV2.scala| 8 +- .../execution/streaming/StreamExecution.scala | 15 +- .../ContinuousDataSourceRDDIter.scala | 3 +- .../continuous/ContinuousExecution.scala| 67 +-- .../streaming/continuous/EpochCoordinator.scala | 21 +- .../spark/sql/streaming/DataStreamWriter.scala | 26 +- .../apache/spark/sql/streaming/StreamTest.scala | 36 +- 21 files changed, 1531 insertions(+), 383 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6f7aaed8/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala new file mode 100644 index 000..9283795 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +/** + * A [[ContinuousReader]] for data from kafka. + * + *
[1/2] spark git commit: [SPARK-22908] Add kafka source and sink for continuous processing.
Repository: spark Updated Branches: refs/heads/master 0b2eefb67 -> 6f7aaed80 http://git-wip-us.apache.org/repos/asf/spark/blob/6f7aaed8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3304f36..97f12ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -255,17 +255,24 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } } -case _ => throw new AnalysisException(s"$cls does not support data writing.") +// Streaming also uses the data source V2 API. So it may be that the data source implements +// v2, but has no v2 implementation for batch writes. In that case, we fall back to saving +// as though it's a V1 source. +case _ => saveToV1Source() } } else { - // Code path for data source v1. - runCommand(df.sparkSession, "save") { -DataSource( - sparkSession = df.sparkSession, - className = source, - partitionColumns = partitioningColumns.getOrElse(Nil), - options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) - } + saveToV1Source() +} + } + + private def saveToV1Source(): Unit = { +// Code path for data source v1. +runCommand(df.sparkSession, "save") { + DataSource( +sparkSession = df.sparkSession, +className = source, +partitionColumns = partitioningColumns.getOrElse(Nil), +options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/6f7aaed8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index f0bdf84..a4a857f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -81,9 +81,11 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan) (index, message: WriterCommitMessage) => messages(index) = message ) - logInfo(s"Data source writer $writer is committing.") - writer.commit(messages) - logInfo(s"Data source writer $writer committed.") + if (!writer.isInstanceOf[ContinuousWriter]) { +logInfo(s"Data source writer $writer is committing.") +writer.commit(messages) +logInfo(s"Data source writer $writer committed.") + } } catch { case _: InterruptedException if writer.isInstanceOf[ContinuousWriter] => // Interruption is how continuous queries are ended, so accept and ignore the exception. http://git-wip-us.apache.org/repos/asf/spark/blob/6f7aaed8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 24a8b00..cf27e1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -142,7 +142,8 @@ abstract class StreamExecution( override val id: UUID = UUID.fromString(streamMetadata.id) - override val runId: UUID = UUID.randomUUID + override def runId: UUID = currentRunId + protected var currentRunId = UUID.randomUUID /** * Pretty identified string of printing in logs. Format is @@ -418,11 +419,17 @@ abstract class StreamExecution( * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is intended for use primarily when writing tests. */ - private[sql] def awaitOffset(source: BaseStreamingSource, newOffset: Offset): Unit = { + private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit = { assertAwaitThread() def notDone = { val localCommittedOffsets = committedOffsets - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset + if (sources == null) { +// sources might not be initialized yet +false + } else { +val
spark git commit: [SPARK-22994][K8S] Use a single image for all Spark containers.
Repository: spark Updated Branches: refs/heads/branch-2.3 f624850fe -> b94debd2b [SPARK-22994][K8S] Use a single image for all Spark containers. This change allows a user to submit a Spark application on kubernetes having to provide a single image, instead of one image for each type of container. The image's entry point now takes an extra argument that identifies the process that is being started. The configuration still allows the user to provide different images for each container type if they so desire. On top of that, the entry point was simplified a bit to share more code; mainly, the same env variable is used to propagate the user-defined classpath to the different containers. Aside from being modified to match the new behavior, the 'build-push-docker-images.sh' script was renamed to 'docker-image-tool.sh' to more closely match its purpose; the old name was a little awkward and now also not entirely correct, since there is a single image. It was also moved to 'bin' since it's not necessarily an admin tool. Docs have been updated to match the new behavior. Tested locally with minikube. Author: Marcelo VanzinCloses #20192 from vanzin/SPARK-22994. (cherry picked from commit 0b2eefb674151a0af64806728b38d9410da552ec) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b94debd2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b94debd2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b94debd2 Branch: refs/heads/branch-2.3 Commit: b94debd2b01b87ef1d2a34d48877e38ade0969e6 Parents: f624850 Author: Marcelo Vanzin Authored: Thu Jan 11 10:37:35 2018 -0800 Committer: Marcelo Vanzin Committed: Thu Jan 11 10:37:55 2018 -0800 -- bin/docker-image-tool.sh| 145 ++ docs/running-on-kubernetes.md | 58 .../org/apache/spark/deploy/k8s/Config.scala| 17 ++- .../org/apache/spark/deploy/k8s/Constants.scala | 3 +- .../deploy/k8s/InitContainerBootstrap.scala | 1 + .../steps/BasicDriverConfigurationStep.scala| 3 +- .../cluster/k8s/ExecutorPodFactory.scala| 3 +- .../submit/DriverConfigOrchestratorSuite.scala | 12 +- .../BasicDriverConfigurationStepSuite.scala | 4 +- .../InitContainerConfigOrchestratorSuite.scala | 4 +- .../cluster/k8s/ExecutorPodFactorySuite.scala | 4 +- .../src/main/dockerfiles/driver/Dockerfile | 35 - .../src/main/dockerfiles/executor/Dockerfile| 35 - .../main/dockerfiles/init-container/Dockerfile | 24 --- .../src/main/dockerfiles/spark-base/Dockerfile | 50 --- .../main/dockerfiles/spark-base/entrypoint.sh | 37 - .../src/main/dockerfiles/spark/Dockerfile | 52 +++ .../src/main/dockerfiles/spark/entrypoint.sh| 97 sbin/build-push-docker-images.sh| 149 --- 19 files changed, 348 insertions(+), 385 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b94debd2/bin/docker-image-tool.sh -- diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh new file mode 100755 index 000..0714063 --- /dev/null +++ b/bin/docker-image-tool.sh @@ -0,0 +1,145 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This script builds and pushes docker images when run from a release of Spark +# with Kubernetes support. + +function error { + echo "$@" 1>&2 + exit 1 +} + +if [ -z "${SPARK_HOME}" ]; then + SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" +fi +. "${SPARK_HOME}/bin/load-spark-env.sh" + +function image_ref { + local image="$1" + local add_repo="${2:-1}" + if [ $add_repo = 1 ] && [ -n "$REPO" ]; then +image="$REPO/$image" + fi + if [ -n "$TAG" ]; then +image="$image:$TAG" + fi + echo "$image" +} + +function build { + local BUILD_ARGS + local IMG_PATH + + if [ ! -f "$SPARK_HOME/RELEASE" ]; then +
spark git commit: [SPARK-22994][K8S] Use a single image for all Spark containers.
Repository: spark Updated Branches: refs/heads/master 6d230dccf -> 0b2eefb67 [SPARK-22994][K8S] Use a single image for all Spark containers. This change allows a user to submit a Spark application on kubernetes having to provide a single image, instead of one image for each type of container. The image's entry point now takes an extra argument that identifies the process that is being started. The configuration still allows the user to provide different images for each container type if they so desire. On top of that, the entry point was simplified a bit to share more code; mainly, the same env variable is used to propagate the user-defined classpath to the different containers. Aside from being modified to match the new behavior, the 'build-push-docker-images.sh' script was renamed to 'docker-image-tool.sh' to more closely match its purpose; the old name was a little awkward and now also not entirely correct, since there is a single image. It was also moved to 'bin' since it's not necessarily an admin tool. Docs have been updated to match the new behavior. Tested locally with minikube. Author: Marcelo VanzinCloses #20192 from vanzin/SPARK-22994. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b2eefb6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b2eefb6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b2eefb6 Branch: refs/heads/master Commit: 0b2eefb674151a0af64806728b38d9410da552ec Parents: 6d230dc Author: Marcelo Vanzin Authored: Thu Jan 11 10:37:35 2018 -0800 Committer: Marcelo Vanzin Committed: Thu Jan 11 10:37:35 2018 -0800 -- bin/docker-image-tool.sh| 145 ++ docs/running-on-kubernetes.md | 58 .../org/apache/spark/deploy/k8s/Config.scala| 17 ++- .../org/apache/spark/deploy/k8s/Constants.scala | 3 +- .../deploy/k8s/InitContainerBootstrap.scala | 1 + .../steps/BasicDriverConfigurationStep.scala| 3 +- .../cluster/k8s/ExecutorPodFactory.scala| 3 +- .../submit/DriverConfigOrchestratorSuite.scala | 12 +- .../BasicDriverConfigurationStepSuite.scala | 4 +- .../InitContainerConfigOrchestratorSuite.scala | 4 +- .../cluster/k8s/ExecutorPodFactorySuite.scala | 4 +- .../src/main/dockerfiles/driver/Dockerfile | 35 - .../src/main/dockerfiles/executor/Dockerfile| 35 - .../main/dockerfiles/init-container/Dockerfile | 24 --- .../src/main/dockerfiles/spark-base/Dockerfile | 50 --- .../main/dockerfiles/spark-base/entrypoint.sh | 37 - .../src/main/dockerfiles/spark/Dockerfile | 52 +++ .../src/main/dockerfiles/spark/entrypoint.sh| 97 sbin/build-push-docker-images.sh| 149 --- 19 files changed, 348 insertions(+), 385 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0b2eefb6/bin/docker-image-tool.sh -- diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh new file mode 100755 index 000..0714063 --- /dev/null +++ b/bin/docker-image-tool.sh @@ -0,0 +1,145 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This script builds and pushes docker images when run from a release of Spark +# with Kubernetes support. + +function error { + echo "$@" 1>&2 + exit 1 +} + +if [ -z "${SPARK_HOME}" ]; then + SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" +fi +. "${SPARK_HOME}/bin/load-spark-env.sh" + +function image_ref { + local image="$1" + local add_repo="${2:-1}" + if [ $add_repo = 1 ] && [ -n "$REPO" ]; then +image="$REPO/$image" + fi + if [ -n "$TAG" ]; then +image="$image:$TAG" + fi + echo "$image" +} + +function build { + local BUILD_ARGS + local IMG_PATH + + if [ ! -f "$SPARK_HOME/RELEASE" ]; then +# Set image build arguments accordingly if this is a source repo and not a distribution archive. +
svn commit: r24133 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_11_08_01-6d230dc-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 11 16:20:31 2018 New Revision: 24133 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_11_08_01-6d230dc docs [This commit notification would consist of 1440 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Update PageRank.scala
Repository: spark Updated Branches: refs/heads/master b46e58b74 -> 6d230dccf Update PageRank.scala ## What changes were proposed in this pull request? Hi, acording to code below, "if (id == src) (0.0, Double.NegativeInfinity) else (0.0, 0.0)" I think the comment can be wrong ## How was this patch tested? Please review http://spark.apache.org/contributing.html before opening a pull request. Author: FanDonglaiCloses #20220 from ddna1021/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d230dcc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d230dcc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d230dcc Branch: refs/heads/master Commit: 6d230dccf65300651f989392159d84bfaf08f18f Parents: b46e58b Author: FanDonglai Authored: Thu Jan 11 09:06:40 2018 -0600 Committer: Sean Owen Committed: Thu Jan 11 09:06:40 2018 -0600 -- graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6d230dcc/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index fd7b7f7..ebd65e8 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -303,7 +303,7 @@ object PageRank extends Logging { val src: VertexId = srcId.getOrElse(-1L) // Initialize the pagerankGraph with each edge attribute -// having weight 1/outDegree and each vertex with attribute 1.0. +// having weight 1/outDegree and each vertex with attribute 0. val pagerankGraph: Graph[(Double, Double), Double] = graph // Associate the degree with each vertex .outerJoinVertices(graph.outDegrees) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24132 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_11_06_01-f624850-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 11 14:15:28 2018 New Revision: 24132 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_11_06_01-f624850 docs [This commit notification would consist of 1440 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19732][FOLLOW-UP] Document behavior changes made in na.fill and fillna
Repository: spark Updated Branches: refs/heads/branch-2.3 9ca0f6eaf -> f624850fe [SPARK-19732][FOLLOW-UP] Document behavior changes made in na.fill and fillna ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/18164 introduces the behavior changes. We need to document it. ## How was this patch tested? N/A Author: gatorsmileCloses #20234 from gatorsmile/docBehaviorChange. (cherry picked from commit b46e58b74c82dac37b7b92284ea3714919c5a886) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f624850f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f624850f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f624850f Branch: refs/heads/branch-2.3 Commit: f624850fe8acce52240217f376316734a23be00b Parents: 9ca0f6e Author: gatorsmile Authored: Thu Jan 11 22:33:42 2018 +0900 Committer: hyukjinkwon Committed: Thu Jan 11 22:33:57 2018 +0900 -- docs/sql-programming-guide.md | 10 -- 1 file changed, 4 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f624850f/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 72f79d6..258c769 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1788,12 +1788,10 @@ options. Note that, for DecimalType(38,0)*, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type. - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc. - In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details. - - - Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489). - - - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`. - - - Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`. + - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame. + - Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489). + - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`. + - Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`. ## Upgrading From Spark SQL 2.1 to 2.2 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19732][FOLLOW-UP] Document behavior changes made in na.fill and fillna
Repository: spark Updated Branches: refs/heads/master 76892bcf2 -> b46e58b74 [SPARK-19732][FOLLOW-UP] Document behavior changes made in na.fill and fillna ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/18164 introduces the behavior changes. We need to document it. ## How was this patch tested? N/A Author: gatorsmileCloses #20234 from gatorsmile/docBehaviorChange. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b46e58b7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b46e58b7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b46e58b7 Branch: refs/heads/master Commit: b46e58b74c82dac37b7b92284ea3714919c5a886 Parents: 76892bc Author: gatorsmile Authored: Thu Jan 11 22:33:42 2018 +0900 Committer: hyukjinkwon Committed: Thu Jan 11 22:33:42 2018 +0900 -- docs/sql-programming-guide.md | 10 -- 1 file changed, 4 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b46e58b7/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 72f79d6..258c769 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1788,12 +1788,10 @@ options. Note that, for DecimalType(38,0)*, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type. - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc. - In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details. - - - Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489). - - - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`. - - - Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`. + - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame. + - Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489). + - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`. + - Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`. ## Upgrading From Spark SQL 2.1 to 2.2 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23000][TEST-HADOOP2.6] Fix Flaky test suite DataSourceWithHiveMetastoreCatalogSuite
Repository: spark Updated Branches: refs/heads/branch-2.3 799598905 -> 9ca0f6eaf [SPARK-23000][TEST-HADOOP2.6] Fix Flaky test suite DataSourceWithHiveMetastoreCatalogSuite ## What changes were proposed in this pull request? The Spark 2.3 branch still failed due to the flaky test suite `DataSourceWithHiveMetastoreCatalogSuite `. https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-2.3-test-sbt-hadoop-2.6/ Although https://github.com/apache/spark/pull/20207 is unable to reproduce it in Spark 2.3, it sounds like the current DB of Spark's Catalog is changed based on the following stacktrace. Thus, we just need to reset it. ``` [info] DataSourceWithHiveMetastoreCatalogSuite: 02:40:39.486 ERROR org.apache.hadoop.hive.ql.parse.CalcitePlanner: org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:14 Table not found 't' at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:1594) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:1545) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genResolvedParseTree(SemanticAnalyzer.java:10077) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:10128) at org.apache.hadoop.hive.ql.parse.CalcitePlanner.analyzeInternal(CalcitePlanner.java:209) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:227) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:424) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:308) at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1122) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1170) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:694) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:683) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:272) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:210) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:209) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:255) at org.apache.spark.sql.hive.client.HiveClientImpl.runHive(HiveClientImpl.scala:683) at org.apache.spark.sql.hive.client.HiveClientImpl.runSqlHive(HiveClientImpl.scala:673) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$9$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:185) at org.apache.spark.sql.test.SQLTestUtilsBase$class.withTable(SQLTestUtils.scala:273) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTable(HiveMetastoreCatalogSuite.scala:139) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$9$$anonfun$apply$1.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:163) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$9$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:163) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$9$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:163) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196) at org.scalatest.FunSuite.runTest(FunSuite.scala:1560) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384) at scala.collection.immutable.List.foreach(List.scala:381) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) at
spark git commit: [SPARK-23000][TEST-HADOOP2.6] Fix Flaky test suite DataSourceWithHiveMetastoreCatalogSuite
Repository: spark Updated Branches: refs/heads/master 0552c36e0 -> 76892bcf2 [SPARK-23000][TEST-HADOOP2.6] Fix Flaky test suite DataSourceWithHiveMetastoreCatalogSuite ## What changes were proposed in this pull request? The Spark 2.3 branch still failed due to the flaky test suite `DataSourceWithHiveMetastoreCatalogSuite `. https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-2.3-test-sbt-hadoop-2.6/ Although https://github.com/apache/spark/pull/20207 is unable to reproduce it in Spark 2.3, it sounds like the current DB of Spark's Catalog is changed based on the following stacktrace. Thus, we just need to reset it. ``` [info] DataSourceWithHiveMetastoreCatalogSuite: 02:40:39.486 ERROR org.apache.hadoop.hive.ql.parse.CalcitePlanner: org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:14 Table not found 't' at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:1594) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:1545) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genResolvedParseTree(SemanticAnalyzer.java:10077) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:10128) at org.apache.hadoop.hive.ql.parse.CalcitePlanner.analyzeInternal(CalcitePlanner.java:209) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:227) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:424) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:308) at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1122) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1170) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:694) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:683) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:272) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:210) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:209) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:255) at org.apache.spark.sql.hive.client.HiveClientImpl.runHive(HiveClientImpl.scala:683) at org.apache.spark.sql.hive.client.HiveClientImpl.runSqlHive(HiveClientImpl.scala:673) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$9$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:185) at org.apache.spark.sql.test.SQLTestUtilsBase$class.withTable(SQLTestUtils.scala:273) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTable(HiveMetastoreCatalogSuite.scala:139) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$9$$anonfun$apply$1.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:163) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$9$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:163) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$9$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:163) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196) at org.scalatest.FunSuite.runTest(FunSuite.scala:1560) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384) at scala.collection.immutable.List.foreach(List.scala:381) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) at
spark git commit: [SPARK-22967][TESTS] Fix VersionSuite's unit tests by change Windows path into URI path
Repository: spark Updated Branches: refs/heads/master 1c70da3bf -> 0552c36e0 [SPARK-22967][TESTS] Fix VersionSuite's unit tests by change Windows path into URI path ## What changes were proposed in this pull request? Two unit test will fail due to Windows format path: 1.test(s"$version: read avro file containing decimal") ``` org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); ``` 2.test(s"$version: SPARK-17920: Insert into/overwrite avro table") ``` Unable to infer the schema. The schema specification is required to create the table `default`.`tab2`.; org.apache.spark.sql.AnalysisException: Unable to infer the schema. The schema specification is required to create the table `default`.`tab2`.; ``` This pr fix these two unit test by change Windows path into URI path. ## How was this patch tested? Existed. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: wuyi5Closes #20199 from Ngone51/SPARK-22967. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0552c36e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0552c36e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0552c36e Branch: refs/heads/master Commit: 0552c36e02434c60dad82024334d291f6008b822 Parents: 1c70da3 Author: wuyi5 Authored: Thu Jan 11 22:17:15 2018 +0900 Committer: hyukjinkwon Committed: Thu Jan 11 22:17:15 2018 +0900 -- .../org/apache/spark/sql/hive/client/VersionsSuite.scala | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0552c36e/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index ff90e9d..e64389e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -811,7 +811,7 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: read avro file containing decimal") { val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") - val location = new File(url.getFile) + val location = new File(url.getFile).toURI.toString val tableName = "tab1" val avroSchema = @@ -851,6 +851,8 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: SPARK-17920: Insert into/overwrite avro table") { + // skipped because it's failed in the condition on Windows + assume(!(Utils.isWindows && version == "0.12")) withTempDir { dir => val avroSchema = """ @@ -875,10 +877,10 @@ class VersionsSuite extends SparkFunSuite with Logging { val writer = new PrintWriter(schemaFile) writer.write(avroSchema) writer.close() -val schemaPath = schemaFile.getCanonicalPath +val schemaPath = schemaFile.toURI.toString val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") -val srcLocation = new File(url.getFile).getCanonicalPath +val srcLocation = new File(url.getFile).toURI.toString val destTableName = "tab1" val srcTableName = "tab2" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22967][TESTS] Fix VersionSuite's unit tests by change Windows path into URI path
Repository: spark Updated Branches: refs/heads/branch-2.3 b78130123 -> 799598905 [SPARK-22967][TESTS] Fix VersionSuite's unit tests by change Windows path into URI path ## What changes were proposed in this pull request? Two unit test will fail due to Windows format path: 1.test(s"$version: read avro file containing decimal") ``` org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); ``` 2.test(s"$version: SPARK-17920: Insert into/overwrite avro table") ``` Unable to infer the schema. The schema specification is required to create the table `default`.`tab2`.; org.apache.spark.sql.AnalysisException: Unable to infer the schema. The schema specification is required to create the table `default`.`tab2`.; ``` This pr fix these two unit test by change Windows path into URI path. ## How was this patch tested? Existed. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: wuyi5Closes #20199 from Ngone51/SPARK-22967. (cherry picked from commit 0552c36e02434c60dad82024334d291f6008b822) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79959890 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79959890 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79959890 Branch: refs/heads/branch-2.3 Commit: 79959890570d216c33069c8382b29d53977665b1 Parents: b781301 Author: wuyi5 Authored: Thu Jan 11 22:17:15 2018 +0900 Committer: hyukjinkwon Committed: Thu Jan 11 22:17:28 2018 +0900 -- .../org/apache/spark/sql/hive/client/VersionsSuite.scala | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/79959890/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index ff90e9d..e64389e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -811,7 +811,7 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: read avro file containing decimal") { val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") - val location = new File(url.getFile) + val location = new File(url.getFile).toURI.toString val tableName = "tab1" val avroSchema = @@ -851,6 +851,8 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: SPARK-17920: Insert into/overwrite avro table") { + // skipped because it's failed in the condition on Windows + assume(!(Utils.isWindows && version == "0.12")) withTempDir { dir => val avroSchema = """ @@ -875,10 +877,10 @@ class VersionsSuite extends SparkFunSuite with Logging { val writer = new PrintWriter(schemaFile) writer.write(avroSchema) writer.close() -val schemaPath = schemaFile.getCanonicalPath +val schemaPath = schemaFile.toURI.toString val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") -val srcLocation = new File(url.getFile).getCanonicalPath +val srcLocation = new File(url.getFile).toURI.toString val destTableName = "tab1" val srcTableName = "tab2" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24131 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_11_04_01-1c70da3-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 11 12:17:45 2018 New Revision: 24131 Log: Apache Spark 2.3.0-SNAPSHOT-2018_01_11_04_01-1c70da3 docs [This commit notification would consist of 1440 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [SPARK-20657][CORE] Speed up rendering of the stages page.
Repository: spark Updated Branches: refs/heads/branch-2.3 d9a973d65 -> b78130123 http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 11a6a34..7c6e06c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -19,6 +19,7 @@ package org.apache.spark.ui.jobs import java.net.URLEncoder import java.util.Date +import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletRequest import scala.collection.mutable.{HashMap, HashSet} @@ -29,15 +30,14 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.SparkConf import org.apache.spark.internal.config._ import org.apache.spark.scheduler.TaskLocality -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status._ import org.apache.spark.status.api.v1._ import org.apache.spark.ui._ -import org.apache.spark.util.{Distribution, Utils} +import org.apache.spark.util.Utils /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends WebUIPage("stage") { import ApiHelper._ - import StagePage._ private val TIMELINE_LEGEND = { @@ -67,17 +67,17 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We // if we find that it's okay. private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000) - private def getLocalitySummaryString(stageData: StageData, taskList: Seq[TaskData]): String = { -val localities = taskList.map(_.taskLocality) -val localityCounts = localities.groupBy(identity).mapValues(_.size) + private def getLocalitySummaryString(localitySummary: Map[String, Long]): String = { val names = Map( TaskLocality.PROCESS_LOCAL.toString() -> "Process local", TaskLocality.NODE_LOCAL.toString() -> "Node local", TaskLocality.RACK_LOCAL.toString() -> "Rack local", TaskLocality.ANY.toString() -> "Any") -val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, count) => - s"${names(locality)}: $count" -} +val localityNamesAndCounts = names.flatMap { case (key, name) => + localitySummary.get(key).map { count => +s"$name: $count" + } +}.toSeq localityNamesAndCounts.sorted.mkString("; ") } @@ -108,7 +108,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)" val stageData = parent.store - .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = true)) + .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = false)) .getOrElse { val content = @@ -117,8 +117,11 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We return UIUtils.headerSparkPage(stageHeader, content, parent) } -val tasks = stageData.tasks.getOrElse(Map.empty).values.toSeq -if (tasks.isEmpty) { +val localitySummary = store.localitySummary(stageData.stageId, stageData.attemptId) + +val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks + + stageData.numFailedTasks + stageData.numKilledTasks +if (totalTasks == 0) { val content = Summary Metrics No tasks have started yet @@ -127,18 +130,14 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We return UIUtils.headerSparkPage(stageHeader, content, parent) } +val storedTasks = store.taskCount(stageData.stageId, stageData.attemptId) val numCompleted = stageData.numCompleteTasks -val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks + - stageData.numFailedTasks + stageData.numKilledTasks -val totalTasksNumStr = if (totalTasks == tasks.size) { +val totalTasksNumStr = if (totalTasks == storedTasks) { s"$totalTasks" } else { - s"$totalTasks, showing ${tasks.size}" + s"$totalTasks, showing ${storedTasks}" } -val externalAccumulables = stageData.accumulatorUpdates -val hasAccumulators = externalAccumulables.size > 0 - val summary = @@ -148,7 +147,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We Locality Level Summary: -{getLocalitySummaryString(stageData, tasks)} +{getLocalitySummaryString(localitySummary)} {if (hasInput(stageData)) { @@ -266,7 +265,7 @@ private[ui] class StagePage(parent: StagesTab,
[2/2] spark git commit: [SPARK-20657][CORE] Speed up rendering of the stages page.
[SPARK-20657][CORE] Speed up rendering of the stages page. There are two main changes to speed up rendering of the tasks list when rendering the stage page. The first one makes the code only load the tasks being shown in the current page of the tasks table, and information related to only those tasks. One side-effect of this change is that the graph that shows task-related events now only shows events for the tasks in the current page, instead of the previously hardcoded limit of "events for the first 1000 tasks". That ends up helping with readability, though. To make sorting efficient when using a disk store, the task wrapper was extended to include many new indices, one for each of the sortable columns in the UI, and metrics for which quantiles are calculated. The second changes the way metric quantiles are calculated for stages. Instead of using the "Distribution" class to process data for all task metrics, which requires scanning all tasks of a stage, the code now uses the KVStore "skip()" functionality to only read tasks that contain interesting information for the quantiles that are desired. This is still not cheap; because there are many metrics that the UI and API track, the code needs to scan the index for each metric to gather the information. Savings come mainly from skipping deserialization when using the disk store, but the in-memory code also seems to be faster than before (most probably because of other changes in this patch). To make subsequent calls faster, some quantiles are cached in the status store. This makes UIs much faster after the first time a stage has been loaded. With the above changes, a lot of code in the UI layer could be simplified. Author: Marcelo VanzinCloses #20013 from vanzin/SPARK-20657. (cherry picked from commit 1c70da3bfbb4016e394de2c73eb0db7cdd9a6968) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7813012 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7813012 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7813012 Branch: refs/heads/branch-2.3 Commit: b78130123baba87554503e81b8aee3121666ba91 Parents: d9a973d Author: Marcelo Vanzin Authored: Thu Jan 11 19:41:48 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 11 19:42:19 2018 +0800 -- .../org/apache/spark/util/kvstore/LevelDB.java | 1 + .../apache/spark/status/AppStatusListener.scala | 57 +- .../apache/spark/status/AppStatusStore.scala| 389 +--- .../apache/spark/status/AppStatusUtils.scala| 68 ++ .../org/apache/spark/status/LiveEntity.scala| 344 --- .../spark/status/api/v1/StagesResource.scala| 3 +- .../org/apache/spark/status/api/v1/api.scala| 3 + .../org/apache/spark/status/storeTypes.scala| 327 ++- .../apache/spark/ui/jobs/ExecutorTable.scala| 4 +- .../org/apache/spark/ui/jobs/JobPage.scala | 2 +- .../org/apache/spark/ui/jobs/StagePage.scala| 919 ++- ...summary_w__custom_quantiles_expectation.json | 3 + ...task_summary_w_shuffle_read_expectation.json | 3 + ...ask_summary_w_shuffle_write_expectation.json | 3 + .../spark/status/AppStatusListenerSuite.scala | 105 ++- .../spark/status/AppStatusStoreSuite.scala | 104 +++ .../org/apache/spark/ui/StagePageSuite.scala| 10 +- scalastyle-config.xml | 2 +- 18 files changed, 1361 insertions(+), 986 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java -- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 4f9e10c..0e491ef 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -83,6 +83,7 @@ public class LevelDB implements KVStore { if (versionData != null) { long version = serializer.deserializeLong(versionData); if (version != STORE_VERSION) { +close(); throw new UnsupportedStoreVersionException(); } } else { http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 88b75dd..b4edcf2 100644 ---
[2/2] spark git commit: [SPARK-20657][CORE] Speed up rendering of the stages page.
[SPARK-20657][CORE] Speed up rendering of the stages page. There are two main changes to speed up rendering of the tasks list when rendering the stage page. The first one makes the code only load the tasks being shown in the current page of the tasks table, and information related to only those tasks. One side-effect of this change is that the graph that shows task-related events now only shows events for the tasks in the current page, instead of the previously hardcoded limit of "events for the first 1000 tasks". That ends up helping with readability, though. To make sorting efficient when using a disk store, the task wrapper was extended to include many new indices, one for each of the sortable columns in the UI, and metrics for which quantiles are calculated. The second changes the way metric quantiles are calculated for stages. Instead of using the "Distribution" class to process data for all task metrics, which requires scanning all tasks of a stage, the code now uses the KVStore "skip()" functionality to only read tasks that contain interesting information for the quantiles that are desired. This is still not cheap; because there are many metrics that the UI and API track, the code needs to scan the index for each metric to gather the information. Savings come mainly from skipping deserialization when using the disk store, but the in-memory code also seems to be faster than before (most probably because of other changes in this patch). To make subsequent calls faster, some quantiles are cached in the status store. This makes UIs much faster after the first time a stage has been loaded. With the above changes, a lot of code in the UI layer could be simplified. Author: Marcelo VanzinCloses #20013 from vanzin/SPARK-20657. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c70da3b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c70da3b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c70da3b Branch: refs/heads/master Commit: 1c70da3bfbb4016e394de2c73eb0db7cdd9a6968 Parents: 87c98de Author: Marcelo Vanzin Authored: Thu Jan 11 19:41:48 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 11 19:41:48 2018 +0800 -- .../org/apache/spark/util/kvstore/LevelDB.java | 1 + .../apache/spark/status/AppStatusListener.scala | 57 +- .../apache/spark/status/AppStatusStore.scala| 389 +--- .../apache/spark/status/AppStatusUtils.scala| 68 ++ .../org/apache/spark/status/LiveEntity.scala| 344 --- .../spark/status/api/v1/StagesResource.scala| 3 +- .../org/apache/spark/status/api/v1/api.scala| 3 + .../org/apache/spark/status/storeTypes.scala| 327 ++- .../apache/spark/ui/jobs/ExecutorTable.scala| 4 +- .../org/apache/spark/ui/jobs/JobPage.scala | 2 +- .../org/apache/spark/ui/jobs/StagePage.scala| 919 ++- ...summary_w__custom_quantiles_expectation.json | 3 + ...task_summary_w_shuffle_read_expectation.json | 3 + ...ask_summary_w_shuffle_write_expectation.json | 3 + .../spark/status/AppStatusListenerSuite.scala | 105 ++- .../spark/status/AppStatusStoreSuite.scala | 104 +++ .../org/apache/spark/ui/StagePageSuite.scala| 10 +- scalastyle-config.xml | 2 +- 18 files changed, 1361 insertions(+), 986 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java -- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 4f9e10c..0e491ef 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -83,6 +83,7 @@ public class LevelDB implements KVStore { if (versionData != null) { long version = serializer.deserializeLong(versionData); if (version != STORE_VERSION) { +close(); throw new UnsupportedStoreVersionException(); } } else { http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 88b75dd..b4edcf2 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -377,6 +377,10 @@ private[spark]
[1/2] spark git commit: [SPARK-20657][CORE] Speed up rendering of the stages page.
Repository: spark Updated Branches: refs/heads/master 87c98de8b -> 1c70da3bf http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 11a6a34..7c6e06c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -19,6 +19,7 @@ package org.apache.spark.ui.jobs import java.net.URLEncoder import java.util.Date +import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletRequest import scala.collection.mutable.{HashMap, HashSet} @@ -29,15 +30,14 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.SparkConf import org.apache.spark.internal.config._ import org.apache.spark.scheduler.TaskLocality -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status._ import org.apache.spark.status.api.v1._ import org.apache.spark.ui._ -import org.apache.spark.util.{Distribution, Utils} +import org.apache.spark.util.Utils /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends WebUIPage("stage") { import ApiHelper._ - import StagePage._ private val TIMELINE_LEGEND = { @@ -67,17 +67,17 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We // if we find that it's okay. private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000) - private def getLocalitySummaryString(stageData: StageData, taskList: Seq[TaskData]): String = { -val localities = taskList.map(_.taskLocality) -val localityCounts = localities.groupBy(identity).mapValues(_.size) + private def getLocalitySummaryString(localitySummary: Map[String, Long]): String = { val names = Map( TaskLocality.PROCESS_LOCAL.toString() -> "Process local", TaskLocality.NODE_LOCAL.toString() -> "Node local", TaskLocality.RACK_LOCAL.toString() -> "Rack local", TaskLocality.ANY.toString() -> "Any") -val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, count) => - s"${names(locality)}: $count" -} +val localityNamesAndCounts = names.flatMap { case (key, name) => + localitySummary.get(key).map { count => +s"$name: $count" + } +}.toSeq localityNamesAndCounts.sorted.mkString("; ") } @@ -108,7 +108,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)" val stageData = parent.store - .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = true)) + .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = false)) .getOrElse { val content = @@ -117,8 +117,11 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We return UIUtils.headerSparkPage(stageHeader, content, parent) } -val tasks = stageData.tasks.getOrElse(Map.empty).values.toSeq -if (tasks.isEmpty) { +val localitySummary = store.localitySummary(stageData.stageId, stageData.attemptId) + +val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks + + stageData.numFailedTasks + stageData.numKilledTasks +if (totalTasks == 0) { val content = Summary Metrics No tasks have started yet @@ -127,18 +130,14 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We return UIUtils.headerSparkPage(stageHeader, content, parent) } +val storedTasks = store.taskCount(stageData.stageId, stageData.attemptId) val numCompleted = stageData.numCompleteTasks -val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks + - stageData.numFailedTasks + stageData.numKilledTasks -val totalTasksNumStr = if (totalTasks == tasks.size) { +val totalTasksNumStr = if (totalTasks == storedTasks) { s"$totalTasks" } else { - s"$totalTasks, showing ${tasks.size}" + s"$totalTasks, showing ${storedTasks}" } -val externalAccumulables = stageData.accumulatorUpdates -val hasAccumulators = externalAccumulables.size > 0 - val summary = @@ -148,7 +147,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We Locality Level Summary: -{getLocalitySummaryString(stageData, tasks)} +{getLocalitySummaryString(localitySummary)} {if (hasInput(stageData)) { @@ -266,7 +265,7 @@ private[ui] class StagePage(parent: StagesTab,
spark git commit: [SPARK-23001][SQL] Fix NullPointerException when DESC a database with NULL description
Repository: spark Updated Branches: refs/heads/branch-2.2 0d943d96b -> acab4e7a0 [SPARK-23001][SQL] Fix NullPointerException when DESC a database with NULL description ## What changes were proposed in this pull request? When users' DB description is NULL, users might hit `NullPointerException`. This PR is to fix the issue. ## How was this patch tested? Added test cases Author: gatorsmileCloses #20215 from gatorsmile/SPARK-23001. (cherry picked from commit 87c98de8b23f0e978958fc83677fdc4c339b7e6a) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acab4e7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acab4e7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acab4e7a Branch: refs/heads/branch-2.2 Commit: acab4e7a0909c4426becacf148a043d3e47a63ec Parents: 0d943d9 Author: gatorsmile Authored: Thu Jan 11 18:17:34 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 11 18:18:21 2018 +0800 -- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- .../apache/spark/sql/hive/HiveExternalCatalogSuite.scala| 6 ++ .../org/apache/spark/sql/hive/client/VersionsSuite.scala| 9 + 3 files changed, 16 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/acab4e7a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 541797d..ceeb9c1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -348,7 +348,7 @@ private[hive] class HiveClientImpl( Option(client.getDatabase(dbName)).map { d => CatalogDatabase( name = d.getName, -description = d.getDescription, +description = Option(d.getDescription).getOrElse(""), locationUri = CatalogUtils.stringToURI(d.getLocationUri), properties = Option(d.getParameters).map(_.asScala.toMap).orNull) }.getOrElse(throw new NoSuchDatabaseException(dbName)) http://git-wip-us.apache.org/repos/asf/spark/blob/acab4e7a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 2e35fde..0a522b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -107,4 +107,10 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { .filter(_.contains("Num Buckets")).head assert(bucketString.contains("10")) } + + test("SPARK-23001: NullPointerException when running desc database") { +val catalog = newBasicCatalog() +catalog.createDatabase(newDb("dbWithNullDesc").copy(description = null), ignoreIfExists = false) +assert(catalog.getDatabase("dbWithNullDesc").description == "") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/acab4e7a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 8376fc7..7dd4fef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -164,6 +164,15 @@ class VersionsSuite extends SparkFunSuite with Logging { client.createDatabase(tempDB, ignoreIfExists = true) } +test(s"$version: createDatabase with null description") { + withTempDir { tmpDir => +val dbWithNullDesc = + CatalogDatabase("dbWithNullDesc", description = null, tmpDir.toURI, Map()) +client.createDatabase(dbWithNullDesc, ignoreIfExists = true) +assert(client.getDatabase("dbWithNullDesc").description == "") + } +} + test(s"$version: setCurrentDatabase") { client.setCurrentDatabase("default") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail:
spark git commit: [SPARK-23001][SQL] Fix NullPointerException when DESC a database with NULL description
Repository: spark Updated Branches: refs/heads/branch-2.3 317b0aaed -> d9a973d65 [SPARK-23001][SQL] Fix NullPointerException when DESC a database with NULL description ## What changes were proposed in this pull request? When users' DB description is NULL, users might hit `NullPointerException`. This PR is to fix the issue. ## How was this patch tested? Added test cases Author: gatorsmileCloses #20215 from gatorsmile/SPARK-23001. (cherry picked from commit 87c98de8b23f0e978958fc83677fdc4c339b7e6a) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9a973d6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9a973d6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9a973d6 Branch: refs/heads/branch-2.3 Commit: d9a973d65c52169e3c3b2223d4a55b07ee82b88e Parents: 317b0aa Author: gatorsmile Authored: Thu Jan 11 18:17:34 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 11 18:17:56 2018 +0800 -- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- .../apache/spark/sql/hive/HiveExternalCatalogSuite.scala| 6 ++ .../org/apache/spark/sql/hive/client/VersionsSuite.scala| 9 + 3 files changed, 16 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d9a973d6/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 102f40b..4b923f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -330,7 +330,7 @@ private[hive] class HiveClientImpl( Option(client.getDatabase(dbName)).map { d => CatalogDatabase( name = d.getName, -description = d.getDescription, +description = Option(d.getDescription).getOrElse(""), locationUri = CatalogUtils.stringToURI(d.getLocationUri), properties = Option(d.getParameters).map(_.asScala.toMap).orNull) }.getOrElse(throw new NoSuchDatabaseException(dbName)) http://git-wip-us.apache.org/repos/asf/spark/blob/d9a973d6/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 2e35fde..0a522b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -107,4 +107,10 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { .filter(_.contains("Num Buckets")).head assert(bucketString.contains("10")) } + + test("SPARK-23001: NullPointerException when running desc database") { +val catalog = newBasicCatalog() +catalog.createDatabase(newDb("dbWithNullDesc").copy(description = null), ignoreIfExists = false) +assert(catalog.getDatabase("dbWithNullDesc").description == "") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/d9a973d6/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 94473a0..ff90e9d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -163,6 +163,15 @@ class VersionsSuite extends SparkFunSuite with Logging { client.createDatabase(tempDB, ignoreIfExists = true) } +test(s"$version: createDatabase with null description") { + withTempDir { tmpDir => +val dbWithNullDesc = + CatalogDatabase("dbWithNullDesc", description = null, tmpDir.toURI, Map()) +client.createDatabase(dbWithNullDesc, ignoreIfExists = true) +assert(client.getDatabase("dbWithNullDesc").description == "") + } +} + test(s"$version: setCurrentDatabase") { client.setCurrentDatabase("default") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail:
spark git commit: [SPARK-23001][SQL] Fix NullPointerException when DESC a database with NULL description
Repository: spark Updated Branches: refs/heads/master a6647ffbf -> 87c98de8b [SPARK-23001][SQL] Fix NullPointerException when DESC a database with NULL description ## What changes were proposed in this pull request? When users' DB description is NULL, users might hit `NullPointerException`. This PR is to fix the issue. ## How was this patch tested? Added test cases Author: gatorsmileCloses #20215 from gatorsmile/SPARK-23001. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/87c98de8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/87c98de8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/87c98de8 Branch: refs/heads/master Commit: 87c98de8b23f0e978958fc83677fdc4c339b7e6a Parents: a6647ff Author: gatorsmile Authored: Thu Jan 11 18:17:34 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 11 18:17:34 2018 +0800 -- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- .../apache/spark/sql/hive/HiveExternalCatalogSuite.scala| 6 ++ .../org/apache/spark/sql/hive/client/VersionsSuite.scala| 9 + 3 files changed, 16 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/87c98de8/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 102f40b..4b923f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -330,7 +330,7 @@ private[hive] class HiveClientImpl( Option(client.getDatabase(dbName)).map { d => CatalogDatabase( name = d.getName, -description = d.getDescription, +description = Option(d.getDescription).getOrElse(""), locationUri = CatalogUtils.stringToURI(d.getLocationUri), properties = Option(d.getParameters).map(_.asScala.toMap).orNull) }.getOrElse(throw new NoSuchDatabaseException(dbName)) http://git-wip-us.apache.org/repos/asf/spark/blob/87c98de8/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 2e35fde..0a522b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -107,4 +107,10 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { .filter(_.contains("Num Buckets")).head assert(bucketString.contains("10")) } + + test("SPARK-23001: NullPointerException when running desc database") { +val catalog = newBasicCatalog() +catalog.createDatabase(newDb("dbWithNullDesc").copy(description = null), ignoreIfExists = false) +assert(catalog.getDatabase("dbWithNullDesc").description == "") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/87c98de8/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 94473a0..ff90e9d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -163,6 +163,15 @@ class VersionsSuite extends SparkFunSuite with Logging { client.createDatabase(tempDB, ignoreIfExists = true) } +test(s"$version: createDatabase with null description") { + withTempDir { tmpDir => +val dbWithNullDesc = + CatalogDatabase("dbWithNullDesc", description = null, tmpDir.toURI, Map()) +client.createDatabase(dbWithNullDesc, ignoreIfExists = true) +assert(client.getDatabase("dbWithNullDesc").description == "") + } +} + test(s"$version: setCurrentDatabase") { client.setCurrentDatabase("default") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org