spark git commit: [SPARK-12090] [PYSPARK] consider shuffle in coalesce()
Repository: spark Updated Branches: refs/heads/branch-1.6 3c4938e26 -> c47a7373a [SPARK-12090] [PYSPARK] consider shuffle in coalesce() Author: Davies Liu Closes #10090 from davies/fix_coalesce. (cherry picked from commit 4375eb3f48fc7ae90caf6c21a0d3ab0b66bf4efa) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c47a7373 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c47a7373 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c47a7373 Branch: refs/heads/branch-1.6 Commit: c47a7373a88f49c77b8d65e887cac2ef1ae22eae Parents: 3c4938e Author: Davies Liu Authored: Tue Dec 1 22:41:48 2015 -0800 Committer: Davies Liu Committed: Tue Dec 1 22:42:03 2015 -0800 -- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c47a7373/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 4b4d596..00bb9a6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2015,7 +2015,7 @@ class RDD(object): >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() [[1, 2, 3, 4, 5]] """ -jrdd = self._jrdd.coalesce(numPartitions) +jrdd = self._jrdd.coalesce(numPartitions, shuffle) return RDD(jrdd, self.ctx, self._jrdd_deserializer) def zip(self, other): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12090] [PYSPARK] consider shuffle in coalesce()
Repository: spark Updated Branches: refs/heads/branch-1.5 0d57a4ae1 -> ed7264ba2 [SPARK-12090] [PYSPARK] consider shuffle in coalesce() Author: Davies Liu Closes #10090 from davies/fix_coalesce. (cherry picked from commit 4375eb3f48fc7ae90caf6c21a0d3ab0b66bf4efa) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed7264ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed7264ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed7264ba Branch: refs/heads/branch-1.5 Commit: ed7264ba20d178be45dad3f50e9c73ce2f9148dc Parents: 0d57a4a Author: Davies Liu Authored: Tue Dec 1 22:41:48 2015 -0800 Committer: Davies Liu Committed: Tue Dec 1 22:42:19 2015 -0800 -- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ed7264ba/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ab5aab1..7e871d3 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2024,7 +2024,7 @@ class RDD(object): >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() [[1, 2, 3, 4, 5]] """ -jrdd = self._jrdd.coalesce(numPartitions) +jrdd = self._jrdd.coalesce(numPartitions, shuffle) return RDD(jrdd, self.ctx, self._jrdd_deserializer) def zip(self, other): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12090] [PYSPARK] consider shuffle in coalesce()
Repository: spark Updated Branches: refs/heads/master 0f37d1d7e -> 4375eb3f4 [SPARK-12090] [PYSPARK] consider shuffle in coalesce() Author: Davies Liu Closes #10090 from davies/fix_coalesce. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4375eb3f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4375eb3f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4375eb3f Branch: refs/heads/master Commit: 4375eb3f48fc7ae90caf6c21a0d3ab0b66bf4efa Parents: 0f37d1d Author: Davies Liu Authored: Tue Dec 1 22:41:48 2015 -0800 Committer: Davies Liu Committed: Tue Dec 1 22:41:48 2015 -0800 -- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4375eb3f/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 4b4d596..00bb9a6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2015,7 +2015,7 @@ class RDD(object): >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() [[1, 2, 3, 4, 5]] """ -jrdd = self._jrdd.coalesce(numPartitions) +jrdd = self._jrdd.coalesce(numPartitions, shuffle) return RDD(jrdd, self.ctx, self._jrdd_deserializer) def zip(self, other): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11949][SQL] Check bitmasks to set nullable property
Repository: spark Updated Branches: refs/heads/master 8a75a3049 -> 0f37d1d7e [SPARK-11949][SQL] Check bitmasks to set nullable property Following up #10038. We can use bitmasks to determine which grouping expressions need to be set as nullable. cc yhuai Author: Liang-Chi Hsieh Closes #10067 from viirya/fix-cube-following. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f37d1d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f37d1d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f37d1d7 Branch: refs/heads/master Commit: 0f37d1d7ed7f6e34f98f2a3c274918de29e7a1d7 Parents: 8a75a30 Author: Liang-Chi Hsieh Authored: Tue Dec 1 21:51:33 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 21:51:33 2015 -0800 -- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 13 + 1 file changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0f37d1d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 765327c..d3163dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -224,10 +224,15 @@ class Analyzer( case other => Alias(other, other.toString)() } -// TODO: We need to use bitmasks to determine which grouping expressions need to be -// set as nullable. For example, if we have GROUPING SETS ((a,b), a), we do not need -// to change the nullability of a. -val attributeMap = groupByAliases.map(a => (a -> a.toAttribute.withNullability(true))).toMap +val nonNullBitmask = x.bitmasks.reduce(_ & _) + +val attributeMap = groupByAliases.zipWithIndex.map { case (a, idx) => + if ((nonNullBitmask & 1 << idx) == 0) { +(a -> a.toAttribute.withNullability(true)) + } else { +(a -> a.toAttribute) + } +}.toMap val aggregations: Seq[NamedExpression] = x.aggregations.map { // If an expression is an aggregate (contains a AggregateExpression) then we dont change - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11949][SQL] Check bitmasks to set nullable property
Repository: spark Updated Branches: refs/heads/branch-1.6 1f42295b5 -> 3c4938e26 [SPARK-11949][SQL] Check bitmasks to set nullable property Following up #10038. We can use bitmasks to determine which grouping expressions need to be set as nullable. cc yhuai Author: Liang-Chi Hsieh Closes #10067 from viirya/fix-cube-following. (cherry picked from commit 0f37d1d7ed7f6e34f98f2a3c274918de29e7a1d7) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c4938e2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c4938e2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c4938e2 Branch: refs/heads/branch-1.6 Commit: 3c4938e26185dc0637f3af624830dbff11589997 Parents: 1f42295 Author: Liang-Chi Hsieh Authored: Tue Dec 1 21:51:33 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 21:51:47 2015 -0800 -- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 13 + 1 file changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c4938e2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 765327c..d3163dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -224,10 +224,15 @@ class Analyzer( case other => Alias(other, other.toString)() } -// TODO: We need to use bitmasks to determine which grouping expressions need to be -// set as nullable. For example, if we have GROUPING SETS ((a,b), a), we do not need -// to change the nullability of a. -val attributeMap = groupByAliases.map(a => (a -> a.toAttribute.withNullability(true))).toMap +val nonNullBitmask = x.bitmasks.reduce(_ & _) + +val attributeMap = groupByAliases.zipWithIndex.map { case (a, idx) => + if ((nonNullBitmask & 1 << idx) == 0) { +(a -> a.toAttribute.withNullability(true)) + } else { +(a -> a.toAttribute) + } +}.toMap val aggregations: Seq[NamedExpression] = x.aggregations.map { // If an expression is an aggregate (contains a AggregateExpression) then we dont change - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles
Repository: spark Updated Branches: refs/heads/branch-1.4 f5af299ab -> b6ba2dab2 [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places: * The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched * The JobConf is serialized as part of the DStream checkpoints. These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object. The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf. Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches. Author: Tathagata Das Closes #10088 from tdas/SPARK-12087. (cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6ba2dab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6ba2dab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6ba2dab Branch: refs/heads/branch-1.4 Commit: b6ba2dab26092f56271114aa62f25b2fc9d6adad Parents: f5af299 Author: Tathagata Das Authored: Tue Dec 1 21:04:52 2015 -0800 Committer: Shixiong Zhu Committed: Tue Dec 1 21:05:37 2015 -0800 -- .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b6ba2dab/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 358e4c6..4e392f5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -691,7 +691,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) - rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, +new JobConf(serializableConf.value)) } self.foreachRDD(saveFunc) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles
Repository: spark Updated Branches: refs/heads/master 96691feae -> 8a75a3049 [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places: * The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched * The JobConf is serialized as part of the DStream checkpoints. These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object. The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf. Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches. Author: Tathagata Das Closes #10088 from tdas/SPARK-12087. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a75a304 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a75a304 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a75a304 Branch: refs/heads/master Commit: 8a75a3049539eeef04c0db51736e97070c162b46 Parents: 96691fe Author: Tathagata Das Authored: Tue Dec 1 21:04:52 2015 -0800 Committer: Shixiong Zhu Committed: Tue Dec 1 21:04:52 2015 -0800 -- .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8a75a304/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index fb691ee..2762309 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) val serializableConf = new SerializableJobConf(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) - rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, +new JobConf(serializableConf.value)) } self.foreachRDD(saveFunc) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles
Repository: spark Updated Branches: refs/heads/branch-1.5 4f07a590c -> 0d57a4ae1 [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places: * The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched * The JobConf is serialized as part of the DStream checkpoints. These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object. The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf. Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches. Author: Tathagata Das Closes #10088 from tdas/SPARK-12087. (cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d57a4ae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d57a4ae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d57a4ae Branch: refs/heads/branch-1.5 Commit: 0d57a4ae10f4ec40386194bc3c8e27f32da09d4d Parents: 4f07a59 Author: Tathagata Das Authored: Tue Dec 1 21:04:52 2015 -0800 Committer: Shixiong Zhu Committed: Tue Dec 1 21:05:18 2015 -0800 -- .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0d57a4ae/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 71bec96..aa36997 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -692,7 +692,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) val serializableConf = new SerializableJobConf(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) - rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, +new JobConf(serializableConf.value)) } self.foreachRDD(saveFunc) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles
Repository: spark Updated Branches: refs/heads/branch-1.6 a5743affc -> 1f42295b5 [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places: * The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched * The JobConf is serialized as part of the DStream checkpoints. These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object. The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf. Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches. Author: Tathagata Das Closes #10088 from tdas/SPARK-12087. (cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f42295b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f42295b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f42295b Branch: refs/heads/branch-1.6 Commit: 1f42295b5df69a6039ed2ba8ea67a8e57d77644d Parents: a5743af Author: Tathagata Das Authored: Tue Dec 1 21:04:52 2015 -0800 Committer: Shixiong Zhu Committed: Tue Dec 1 21:05:02 2015 -0800 -- .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f42295b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index fb691ee..2762309 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) val serializableConf = new SerializableJobConf(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) - rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, +new JobConf(serializableConf.value)) } self.foreachRDD(saveFunc) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11352][SQL][BRANCH-1.5] Escape */ in the generated comments.
Repository: spark Updated Branches: refs/heads/branch-1.5 7460e4309 -> 4f07a590c [SPARK-11352][SQL][BRANCH-1.5] Escape */ in the generated comments. https://issues.apache.org/jira/browse/SPARK-11352 This one backports https://github.com/apache/spark/pull/10072 to branch 1.5. Author: Yin Huai Closes #10084 from yhuai/SPARK-11352-branch-1.5. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f07a590 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f07a590 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f07a590 Branch: refs/heads/branch-1.5 Commit: 4f07a590c5d7b9e187f446c077357b00df93ee27 Parents: 7460e43 Author: Yin Huai Authored: Tue Dec 1 20:33:50 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 20:33:50 2015 -0800 -- .../apache/spark/sql/catalyst/expressions/Expression.scala | 8 +++- .../sql/catalyst/expressions/codegen/CodegenFallback.scala | 2 +- .../sql/catalyst/expressions/CodeGenerationSuite.scala | 9 + 3 files changed, 17 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f07a590/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 0b98f55..4ef3b93 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -97,7 +97,7 @@ abstract class Expression extends TreeNode[Expression] { val ve = GeneratedExpressionCode("", isNull, primitive) ve.code = genCode(ctx, ve) // Add `this` in the comment. -ve.copy(s"/* $this */\n" + ve.code) +ve.copy(s"/* ${this.toCommentSafeString} */\n" + ve.code) } /** @@ -175,6 +175,12 @@ abstract class Expression extends TreeNode[Expression] { } override def toString: String = prettyName + children.mkString("(", ",", ")") + + /** + * Returns the string representation of this expression that is safe to be put in + * code comments of generated code. + */ + protected def toCommentSafeString: String = this.toString.replace("*/", "\\*\\/") } http://git-wip-us.apache.org/repos/asf/spark/blob/4f07a590/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala index 3492d2c..5061451 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala @@ -33,7 +33,7 @@ trait CodegenFallback extends Expression { ctx.references += this val objectTerm = ctx.freshName("obj") s""" - /* expression: ${this} */ + /* expression: ${this.toCommentSafeString} */ Object $objectTerm = expressions[${ctx.references.size - 1}].eval(i); boolean ${ev.isNull} = $objectTerm == null; ${ctx.javaType(this.dataType)} ${ev.primitive} = ${ctx.defaultValue(this.dataType)}; http://git-wip-us.apache.org/repos/asf/spark/blob/4f07a590/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index e323467..fcb0c84 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -134,4 +134,13 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { unsafeRow.getStruct(3, 1).getStruct(0, 2).setInt(1, 4) assert(internalRow === internalRow2) } + + test("*/ in the data") { +// When */ appears in a comment block (i.e. in /**/), code gen will break. +// So, in Expression and CodegenFallback, we escape */ to \*\/. +checkEvaluation( + EqualTo(BoundReference(0, StringType, false), Literal.create("*/", StringType)), + true, + InternalRow(UTF8String.fromString("*/"))) + } }
spark git commit: [SPARK-12077][SQL] change the default plan for single distinct
Repository: spark Updated Branches: refs/heads/branch-1.6 84c44b500 -> a5743affc [SPARK-12077][SQL] change the default plan for single distinct Use try to match the behavior for single distinct aggregation with Spark 1.5, but that's not scalable, we should be robust by default, have a flag to address performance regression for low cardinality aggregation. cc yhuai nongli Author: Davies Liu Closes #10075 from davies/agg_15. (cherry picked from commit 96691feae0229fd693c29475620be2c4059dd080) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5743aff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5743aff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5743aff Branch: refs/heads/branch-1.6 Commit: a5743affcf73f7bf71517171583cbddc44cc9368 Parents: 84c44b5 Author: Davies Liu Authored: Tue Dec 1 20:17:12 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 20:17:44 2015 -0800 -- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 2 +- .../test/scala/org/apache/spark/sql/execution/PlannerSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a5743aff/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 5ef3a48..58adf64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -451,7 +451,7 @@ private[spark] object SQLConf { val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING = booleanConf("spark.sql.specializeSingleDistinctAggPlanning", - defaultValue = Some(true), + defaultValue = Some(false), isPublic = false, doc = "When true, if a query only has a single distinct column and it has " + "grouping expressions, we will use our planner rule to handle this distinct " + http://git-wip-us.apache.org/repos/asf/spark/blob/a5743aff/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index dfec139..a462625 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -44,10 +44,10 @@ class PlannerSuite extends SharedSQLContext { fail(s"Could query play aggregation query $query. Is it an aggregation query?")) val aggregations = planned.collect { case n if n.nodeName contains "Aggregate" => n } -// For the new aggregation code path, there will be three aggregate operator for +// For the new aggregation code path, there will be four aggregate operator for // distinct aggregations. assert( - aggregations.size == 2 || aggregations.size == 3, + aggregations.size == 2 || aggregations.size == 4, s"The plan of query $query does not have partial aggregations.") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12077][SQL] change the default plan for single distinct
Repository: spark Updated Branches: refs/heads/master d96f8c997 -> 96691feae [SPARK-12077][SQL] change the default plan for single distinct Use try to match the behavior for single distinct aggregation with Spark 1.5, but that's not scalable, we should be robust by default, have a flag to address performance regression for low cardinality aggregation. cc yhuai nongli Author: Davies Liu Closes #10075 from davies/agg_15. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96691fea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96691fea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96691fea Branch: refs/heads/master Commit: 96691feae0229fd693c29475620be2c4059dd080 Parents: d96f8c9 Author: Davies Liu Authored: Tue Dec 1 20:17:12 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 20:17:12 2015 -0800 -- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 2 +- .../test/scala/org/apache/spark/sql/execution/PlannerSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/96691fea/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 5ef3a48..58adf64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -451,7 +451,7 @@ private[spark] object SQLConf { val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING = booleanConf("spark.sql.specializeSingleDistinctAggPlanning", - defaultValue = Some(true), + defaultValue = Some(false), isPublic = false, doc = "When true, if a query only has a single distinct column and it has " + "grouping expressions, we will use our planner rule to handle this distinct " + http://git-wip-us.apache.org/repos/asf/spark/blob/96691fea/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index dfec139..a462625 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -44,10 +44,10 @@ class PlannerSuite extends SharedSQLContext { fail(s"Could query play aggregation query $query. Is it an aggregation query?")) val aggregations = planned.collect { case n if n.nodeName contains "Aggregate" => n } -// For the new aggregation code path, there will be three aggregate operator for +// For the new aggregation code path, there will be four aggregate operator for // distinct aggregations. assert( - aggregations.size == 2 || aggregations.size == 3, + aggregations.size == 2 || aggregations.size == 4, s"The plan of query $query does not have partial aggregations.") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12081] Make unified memory manager work with small heaps
Repository: spark Updated Branches: refs/heads/master 1ce4adf55 -> d96f8c997 [SPARK-12081] Make unified memory manager work with small heaps The existing `spark.memory.fraction` (default 0.75) gives the system 25% of the space to work with. For small heaps, this is not enough: e.g. default 1GB leaves only 250MB system memory. This is especially a problem in local mode, where the driver and executor are crammed in the same JVM. Members of the community have reported driver OOM's in such cases. **New proposal.** We now reserve 300MB before taking the 75%. For 1GB JVMs, this leaves `(1024 - 300) * 0.75 = 543MB` for execution and storage. This is proposal (1) listed in the [JIRA](https://issues.apache.org/jira/browse/SPARK-12081). Author: Andrew Or Closes #10081 from andrewor14/unified-memory-small-heaps. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d96f8c99 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d96f8c99 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d96f8c99 Branch: refs/heads/master Commit: d96f8c997b9bb5c3d61f513d2c71d67ccf8e85d6 Parents: 1ce4adf Author: Andrew Or Authored: Tue Dec 1 19:51:12 2015 -0800 Committer: Andrew Or Committed: Tue Dec 1 19:51:12 2015 -0800 -- .../spark/memory/UnifiedMemoryManager.scala | 22 .../memory/UnifiedMemoryManagerSuite.scala | 20 ++ docs/configuration.md | 4 ++-- docs/tuning.md | 2 +- 4 files changed, 41 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d96f8c99/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 8be5b05..48b4e23 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId} * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that * either side can borrow memory from the other. * - * The region shared between execution and storage is a fraction of the total heap space + * The region shared between execution and storage is a fraction of (the total heap space - 300MB) * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary * within this space is further determined by `spark.memory.storageFraction` (default 0.5). * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default. @@ -48,7 +48,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId} */ private[spark] class UnifiedMemoryManager private[memory] ( conf: SparkConf, -maxMemory: Long, +val maxMemory: Long, private val storageRegionSize: Long, numCores: Int) extends MemoryManager( @@ -130,6 +130,12 @@ private[spark] class UnifiedMemoryManager private[memory] ( object UnifiedMemoryManager { + // Set aside a fixed amount of memory for non-storage, non-execution purposes. + // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve + // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then + // the memory used for execution and storage will be (1024 - 300) * 0.75 = 543MB by default. + private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 + def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = { val maxMemory = getMaxMemory(conf) new UnifiedMemoryManager( @@ -144,8 +150,16 @@ object UnifiedMemoryManager { * Return the total amount of memory shared between execution and storage, in bytes. */ private def getMaxMemory(conf: SparkConf): Long = { -val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) +val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) +val reservedMemory = conf.getLong("spark.testing.reservedMemory", + if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES) +val minSystemMemory = reservedMemory * 1.5 +if (systemMemory < minSystemMemory) { + throw new IllegalArgumentException(s"System memory $systemMemory must " + +s"be at least $minSystemMemory. Please use a larger heap size.") +} +val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75) -(systemMaxMemory * memoryFraction).toLong +(usableMemory * memoryFrac
spark git commit: [SPARK-12081] Make unified memory manager work with small heaps
Repository: spark Updated Branches: refs/heads/branch-1.6 72da2a21f -> 84c44b500 [SPARK-12081] Make unified memory manager work with small heaps The existing `spark.memory.fraction` (default 0.75) gives the system 25% of the space to work with. For small heaps, this is not enough: e.g. default 1GB leaves only 250MB system memory. This is especially a problem in local mode, where the driver and executor are crammed in the same JVM. Members of the community have reported driver OOM's in such cases. **New proposal.** We now reserve 300MB before taking the 75%. For 1GB JVMs, this leaves `(1024 - 300) * 0.75 = 543MB` for execution and storage. This is proposal (1) listed in the [JIRA](https://issues.apache.org/jira/browse/SPARK-12081). Author: Andrew Or Closes #10081 from andrewor14/unified-memory-small-heaps. (cherry picked from commit d96f8c997b9bb5c3d61f513d2c71d67ccf8e85d6) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84c44b50 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84c44b50 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84c44b50 Branch: refs/heads/branch-1.6 Commit: 84c44b500b5c90dffbe1a6b0aa86f01699b09b96 Parents: 72da2a2 Author: Andrew Or Authored: Tue Dec 1 19:51:12 2015 -0800 Committer: Andrew Or Committed: Tue Dec 1 19:51:29 2015 -0800 -- .../spark/memory/UnifiedMemoryManager.scala | 22 .../memory/UnifiedMemoryManagerSuite.scala | 20 ++ docs/configuration.md | 4 ++-- docs/tuning.md | 2 +- 4 files changed, 41 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/84c44b50/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 8be5b05..48b4e23 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId} * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that * either side can borrow memory from the other. * - * The region shared between execution and storage is a fraction of the total heap space + * The region shared between execution and storage is a fraction of (the total heap space - 300MB) * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary * within this space is further determined by `spark.memory.storageFraction` (default 0.5). * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default. @@ -48,7 +48,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId} */ private[spark] class UnifiedMemoryManager private[memory] ( conf: SparkConf, -maxMemory: Long, +val maxMemory: Long, private val storageRegionSize: Long, numCores: Int) extends MemoryManager( @@ -130,6 +130,12 @@ private[spark] class UnifiedMemoryManager private[memory] ( object UnifiedMemoryManager { + // Set aside a fixed amount of memory for non-storage, non-execution purposes. + // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve + // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then + // the memory used for execution and storage will be (1024 - 300) * 0.75 = 543MB by default. + private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 + def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = { val maxMemory = getMaxMemory(conf) new UnifiedMemoryManager( @@ -144,8 +150,16 @@ object UnifiedMemoryManager { * Return the total amount of memory shared between execution and storage, in bytes. */ private def getMaxMemory(conf: SparkConf): Long = { -val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) +val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) +val reservedMemory = conf.getLong("spark.testing.reservedMemory", + if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES) +val minSystemMemory = reservedMemory * 1.5 +if (systemMemory < minSystemMemory) { + throw new IllegalArgumentException(s"System memory $systemMemory must " + +s"be at least $minSystemMemory. Please use a larger heap size.") +} +val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.getDouble("spar
spark git commit: [SPARK-8414] Ensure context cleaner periodic cleanups
Repository: spark Updated Branches: refs/heads/master e96a70d5a -> 1ce4adf55 [SPARK-8414] Ensure context cleaner periodic cleanups Garbage collection triggers cleanups. If the driver JVM is huge and there is little memory pressure, we may never clean up shuffle files on executors. This is a problem for long-running applications (e.g. streaming). Author: Andrew Or Closes #10070 from andrewor14/periodic-gc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ce4adf5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ce4adf5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ce4adf5 Branch: refs/heads/master Commit: 1ce4adf55b535518c2e63917a827fac1f2df4e8e Parents: e96a70d Author: Andrew Or Authored: Tue Dec 1 19:36:34 2015 -0800 Committer: Josh Rosen Committed: Tue Dec 1 19:36:34 2015 -0800 -- .../scala/org/apache/spark/ContextCleaner.scala | 21 +++- 1 file changed, 20 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1ce4adf5/core/src/main/scala/org/apache/spark/ContextCleaner.scala -- diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index d23c153..bc73253 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -18,12 +18,13 @@ package org.apache.spark import java.lang.ref.{ReferenceQueue, WeakReference} +import java.util.concurrent.{TimeUnit, ScheduledExecutorService} import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * Classes that represent cleaning tasks. @@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleaningThread = new Thread() { override def run() { keepCleaning() }} + private val periodicGCService: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc") + + /** + * How often to trigger a garbage collection in this JVM. + * + * This context cleaner triggers cleanups only when weak references are garbage collected. + * In long-running applications with large driver JVMs, where there is little memory pressure + * on the driver, this may happen very occasionally or not at all. Not cleaning at all may + * lead to executors running out of disk space after a while. + */ + private val periodicGCInterval = +sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min") + /** * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter). @@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.setDaemon(true) cleaningThread.setName("Spark Context Cleaner") cleaningThread.start() +periodicGCService.scheduleAtFixedRate(new Runnable { + override def run(): Unit = System.gc() +}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS) } /** @@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.interrupt() } cleaningThread.join() +periodicGCService.shutdown() } /** Register a RDD for cleanup when it is garbage collected. */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8414] Ensure context cleaner periodic cleanups
Repository: spark Updated Branches: refs/heads/branch-1.6 1b3db967e -> 72da2a21f [SPARK-8414] Ensure context cleaner periodic cleanups Garbage collection triggers cleanups. If the driver JVM is huge and there is little memory pressure, we may never clean up shuffle files on executors. This is a problem for long-running applications (e.g. streaming). Author: Andrew Or Closes #10070 from andrewor14/periodic-gc. (cherry picked from commit 1ce4adf55b535518c2e63917a827fac1f2df4e8e) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72da2a21 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72da2a21 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72da2a21 Branch: refs/heads/branch-1.6 Commit: 72da2a21f0940b97757ace5975535e559d627688 Parents: 1b3db96 Author: Andrew Or Authored: Tue Dec 1 19:36:34 2015 -0800 Committer: Josh Rosen Committed: Tue Dec 1 19:36:47 2015 -0800 -- .../scala/org/apache/spark/ContextCleaner.scala | 21 +++- 1 file changed, 20 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72da2a21/core/src/main/scala/org/apache/spark/ContextCleaner.scala -- diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index d23c153..bc73253 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -18,12 +18,13 @@ package org.apache.spark import java.lang.ref.{ReferenceQueue, WeakReference} +import java.util.concurrent.{TimeUnit, ScheduledExecutorService} import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * Classes that represent cleaning tasks. @@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleaningThread = new Thread() { override def run() { keepCleaning() }} + private val periodicGCService: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc") + + /** + * How often to trigger a garbage collection in this JVM. + * + * This context cleaner triggers cleanups only when weak references are garbage collected. + * In long-running applications with large driver JVMs, where there is little memory pressure + * on the driver, this may happen very occasionally or not at all. Not cleaning at all may + * lead to executors running out of disk space after a while. + */ + private val periodicGCInterval = +sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min") + /** * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter). @@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.setDaemon(true) cleaningThread.setName("Spark Context Cleaner") cleaningThread.start() +periodicGCService.scheduleAtFixedRate(new Runnable { + override def run(): Unit = System.gc() +}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS) } /** @@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.interrupt() } cleaningThread.join() +periodicGCService.shutdown() } /** Register a RDD for cleanup when it is garbage collected. */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11596][SQL] In TreeNode's argString, if a TreeNode is not a child of the current TreeNode, we should only return the simpleString.
Repository: spark Updated Branches: refs/heads/branch-1.6 14eadf921 -> 1b3db967e [SPARK-11596][SQL] In TreeNode's argString, if a TreeNode is not a child of the current TreeNode, we should only return the simpleString. In TreeNode's argString, if a TreeNode is not a child of the current TreeNode, we will only return the simpleString. I tested the [following case provided by Cristian](https://issues.apache.org/jira/browse/SPARK-11596?focusedCommentId=15019241&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15019241). ``` val c = (1 to 20).foldLeft[Option[DataFrame]] (None) { (curr, idx) => println(s"PROCESSING >>> $idx") val df = sqlContext.sparkContext.parallelize((0 to 10).zipWithIndex).toDF("A", "B") val union = curr.map(_.unionAll(df)).getOrElse(df) union.cache() Some(union) } c.get.explain(true) ``` Without the change, `c.get.explain(true)` took 100s. With the change, `c.get.explain(true)` took 26ms. https://issues.apache.org/jira/browse/SPARK-11596 Author: Yin Huai Closes #10079 from yhuai/SPARK-11596. (cherry picked from commit e96a70d5ab2e2b43a2df17a550fa9ed2ee0001c4) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b3db967 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b3db967 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b3db967 Branch: refs/heads/branch-1.6 Commit: 1b3db967e05a628897b7162aa605b2e4650a0d58 Parents: 14eadf9 Author: Yin Huai Authored: Tue Dec 1 17:18:45 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 17:19:09 2015 -0800 -- .../main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1b3db967/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index f1cea07..ad2bd78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -380,7 +380,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { /** Returns a string representing the arguments to this node, minus any children */ def argString: String = productIterator.flatMap { case tn: TreeNode[_] if containsChild(tn) => Nil -case tn: TreeNode[_] if tn.toString contains "\n" => s"(${tn.simpleString})" :: Nil +case tn: TreeNode[_] => s"(${tn.simpleString})" :: Nil case seq: Seq[BaseType] if seq.toSet.subsetOf(children.toSet) => Nil case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil case set: Set[_] => set.mkString("{", ",", "}") :: Nil - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11596][SQL] In TreeNode's argString, if a TreeNode is not a child of the current TreeNode, we should only return the simpleString.
Repository: spark Updated Branches: refs/heads/master 5872a9d89 -> e96a70d5a [SPARK-11596][SQL] In TreeNode's argString, if a TreeNode is not a child of the current TreeNode, we should only return the simpleString. In TreeNode's argString, if a TreeNode is not a child of the current TreeNode, we will only return the simpleString. I tested the [following case provided by Cristian](https://issues.apache.org/jira/browse/SPARK-11596?focusedCommentId=15019241&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15019241). ``` val c = (1 to 20).foldLeft[Option[DataFrame]] (None) { (curr, idx) => println(s"PROCESSING >>> $idx") val df = sqlContext.sparkContext.parallelize((0 to 10).zipWithIndex).toDF("A", "B") val union = curr.map(_.unionAll(df)).getOrElse(df) union.cache() Some(union) } c.get.explain(true) ``` Without the change, `c.get.explain(true)` took 100s. With the change, `c.get.explain(true)` took 26ms. https://issues.apache.org/jira/browse/SPARK-11596 Author: Yin Huai Closes #10079 from yhuai/SPARK-11596. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e96a70d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e96a70d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e96a70d5 Branch: refs/heads/master Commit: e96a70d5ab2e2b43a2df17a550fa9ed2ee0001c4 Parents: 5872a9d Author: Yin Huai Authored: Tue Dec 1 17:18:45 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 17:18:45 2015 -0800 -- .../main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e96a70d5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index f1cea07..ad2bd78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -380,7 +380,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { /** Returns a string representing the arguments to this node, minus any children */ def argString: String = productIterator.flatMap { case tn: TreeNode[_] if containsChild(tn) => Nil -case tn: TreeNode[_] if tn.toString contains "\n" => s"(${tn.simpleString})" :: Nil +case tn: TreeNode[_] => s"(${tn.simpleString})" :: Nil case seq: Seq[BaseType] if seq.toSet.subsetOf(children.toSet) => Nil case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil case set: Set[_] => set.mkString("{", ",", "}") :: Nil - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11352][SQL] Escape */ in the generated comments.
Repository: spark Updated Branches: refs/heads/master 5a8b5fdd6 -> 5872a9d89 [SPARK-11352][SQL] Escape */ in the generated comments. https://issues.apache.org/jira/browse/SPARK-11352 Author: Yin Huai Closes #10072 from yhuai/SPARK-11352. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5872a9d8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5872a9d8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5872a9d8 Branch: refs/heads/master Commit: 5872a9d89fe2720c2bcb1fc7494136947a72581c Parents: 5a8b5fd Author: Yin Huai Authored: Tue Dec 1 16:24:04 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 16:24:04 2015 -0800 -- .../spark/sql/catalyst/expressions/Expression.scala | 10 -- .../catalyst/expressions/codegen/CodegenFallback.scala| 2 +- .../sql/catalyst/expressions/CodeGenerationSuite.scala| 9 + 3 files changed, 18 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5872a9d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index b55d365..4ee6542 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -95,7 +95,7 @@ abstract class Expression extends TreeNode[Expression] { ctx.subExprEliminationExprs.get(this).map { subExprState => // This expression is repeated meaning the code to evaluated has already been added // as a function and called in advance. Just use it. - val code = s"/* $this */" + val code = s"/* ${this.toCommentSafeString} */" GeneratedExpressionCode(code, subExprState.isNull, subExprState.value) }.getOrElse { val isNull = ctx.freshName("isNull") @@ -103,7 +103,7 @@ abstract class Expression extends TreeNode[Expression] { val ve = GeneratedExpressionCode("", isNull, primitive) ve.code = genCode(ctx, ve) // Add `this` in the comment. - ve.copy(s"/* $this */\n" + ve.code.trim) + ve.copy(s"/* ${this.toCommentSafeString} */\n" + ve.code.trim) } } @@ -214,6 +214,12 @@ abstract class Expression extends TreeNode[Expression] { } override def toString: String = prettyName + flatArguments.mkString("(", ",", ")") + + /** + * Returns the string representation of this expression that is safe to be put in + * code comments of generated code. + */ + protected def toCommentSafeString: String = this.toString.replace("*/", "\\*\\/") } http://git-wip-us.apache.org/repos/asf/spark/blob/5872a9d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala index a31574c..26fb143 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala @@ -33,7 +33,7 @@ trait CodegenFallback extends Expression { ctx.references += this val objectTerm = ctx.freshName("obj") s""" - /* expression: ${this} */ + /* expression: ${this.toCommentSafeString} */ java.lang.Object $objectTerm = expressions[${ctx.references.size - 1}].eval(${ctx.INPUT_ROW}); boolean ${ev.isNull} = $objectTerm == null; ${ctx.javaType(this.dataType)} ${ev.value} = ${ctx.defaultValue(this.dataType)}; http://git-wip-us.apache.org/repos/asf/spark/blob/5872a9d8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 002ed16..fe75424 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -98,4 +98,13 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { unsafeRow.getStruct(3, 1).getStru
spark git commit: [SPARK-11352][SQL] Escape */ in the generated comments.
Repository: spark Updated Branches: refs/heads/branch-1.6 1135430a0 -> 14eadf921 [SPARK-11352][SQL] Escape */ in the generated comments. https://issues.apache.org/jira/browse/SPARK-11352 Author: Yin Huai Closes #10072 from yhuai/SPARK-11352. (cherry picked from commit 5872a9d89fe2720c2bcb1fc7494136947a72581c) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14eadf92 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14eadf92 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14eadf92 Branch: refs/heads/branch-1.6 Commit: 14eadf921132219f5597d689ac1ffd6e938a939a Parents: 1135430 Author: Yin Huai Authored: Tue Dec 1 16:24:04 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 16:24:14 2015 -0800 -- .../spark/sql/catalyst/expressions/Expression.scala | 10 -- .../catalyst/expressions/codegen/CodegenFallback.scala| 2 +- .../sql/catalyst/expressions/CodeGenerationSuite.scala| 9 + 3 files changed, 18 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14eadf92/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index b55d365..4ee6542 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -95,7 +95,7 @@ abstract class Expression extends TreeNode[Expression] { ctx.subExprEliminationExprs.get(this).map { subExprState => // This expression is repeated meaning the code to evaluated has already been added // as a function and called in advance. Just use it. - val code = s"/* $this */" + val code = s"/* ${this.toCommentSafeString} */" GeneratedExpressionCode(code, subExprState.isNull, subExprState.value) }.getOrElse { val isNull = ctx.freshName("isNull") @@ -103,7 +103,7 @@ abstract class Expression extends TreeNode[Expression] { val ve = GeneratedExpressionCode("", isNull, primitive) ve.code = genCode(ctx, ve) // Add `this` in the comment. - ve.copy(s"/* $this */\n" + ve.code.trim) + ve.copy(s"/* ${this.toCommentSafeString} */\n" + ve.code.trim) } } @@ -214,6 +214,12 @@ abstract class Expression extends TreeNode[Expression] { } override def toString: String = prettyName + flatArguments.mkString("(", ",", ")") + + /** + * Returns the string representation of this expression that is safe to be put in + * code comments of generated code. + */ + protected def toCommentSafeString: String = this.toString.replace("*/", "\\*\\/") } http://git-wip-us.apache.org/repos/asf/spark/blob/14eadf92/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala index a31574c..26fb143 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala @@ -33,7 +33,7 @@ trait CodegenFallback extends Expression { ctx.references += this val objectTerm = ctx.freshName("obj") s""" - /* expression: ${this} */ + /* expression: ${this.toCommentSafeString} */ java.lang.Object $objectTerm = expressions[${ctx.references.size - 1}].eval(${ctx.INPUT_ROW}); boolean ${ev.isNull} = $objectTerm == null; ${ctx.javaType(this.dataType)} ${ev.value} = ${ctx.defaultValue(this.dataType)}; http://git-wip-us.apache.org/repos/asf/spark/blob/14eadf92/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 002ed16..fe75424 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -98,4 +98,13 @@ class CodeGe
spark git commit: [SPARK-12030] Fix Platform.copyMemory to handle overlapping regions.
Repository: spark Updated Branches: refs/heads/branch-1.5 fc3fb8463 -> 7460e4309 [SPARK-12030] Fix Platform.copyMemory to handle overlapping regions. This bug was exposed as memory corruption in Timsort which uses copyMemory to copy large regions that can overlap. The prior implementation did not handle this case half the time and always copied forward, resulting in the data being corrupt. Author: Nong Li Closes #10068 from nongli/spark-12030. (cherry picked from commit 2cef1cdfbb5393270ae83179b6a4e50c3cbf9e93) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7460e430 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7460e430 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7460e430 Branch: refs/heads/branch-1.5 Commit: 7460e430929473152230d70964a04a7ff834066c Parents: fc3fb84 Author: Nong Li Authored: Tue Dec 1 12:59:53 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 16:11:25 2015 -0800 -- .../java/org/apache/spark/unsafe/Platform.java | 27 +++-- .../apache/spark/unsafe/PlatformUtilSuite.java | 61 2 files changed, 82 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7460e430/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java -- diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 1c16da9..0d6b215 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -107,12 +107,27 @@ public final class Platform { public static void copyMemory( Object src, long srcOffset, Object dst, long dstOffset, long length) { -while (length > 0) { - long size = Math.min(length, UNSAFE_COPY_THRESHOLD); - _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); - length -= size; - srcOffset += size; - dstOffset += size; +// Check if dstOffset is before or after srcOffset to determine if we should copy +// forward or backwards. This is necessary in case src and dst overlap. +if (dstOffset < srcOffset) { + while (length > 0) { +long size = Math.min(length, UNSAFE_COPY_THRESHOLD); +_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); +length -= size; +srcOffset += size; +dstOffset += size; + } +} else { + srcOffset += length; + dstOffset += length; + while (length > 0) { +long size = Math.min(length, UNSAFE_COPY_THRESHOLD); +srcOffset -= size; +dstOffset -= size; +_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); +length -= size; + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/7460e430/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java -- diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java new file mode 100644 index 000..693ec6e --- /dev/null +++ b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe; + +import org.junit.Assert; +import org.junit.Test; + +public class PlatformUtilSuite { + + @Test + public void overlappingCopyMemory() { +byte[] data = new byte[3 * 1024 * 1024]; +int size = 2 * 1024 * 1024; +for (int i = 0; i < data.length; ++i) { + data[i] = (byte)i; +} + +Platform.copyMemory(data, Platform.BYTE_ARRAY_OFFSET, data, Platform.BYTE_ARRAY_OFFSET, size); +for (int i = 0; i < data.length; ++i) { + Assert.assertEquals((byte)i, data[i]); +} + +Platform.copyMemory( +data, +Platform.BYTE_ARRAY_OFFSET + 1, +data, +Platform.BYTE_ARRAY_OFF
spark git commit: [SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source
Repository: spark Updated Branches: refs/heads/branch-1.5 f28399e1a -> fc3fb8463 [SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source When query the Timestamp or Date column like the following val filtered = jdbcdf.where($"TIMESTAMP_COLUMN" >= beg && $"TIMESTAMP_COLUMN" < end) The generated SQL query is "TIMESTAMP_COLUMN >= 2015-01-01 00:00:00.0" It should have quote around the Timestamp/Date value such as "TIMESTAMP_COLUMN >= '2015-01-01 00:00:00.0'" Author: Huaxin Gao Closes #9872 from huaxingao/spark-11788. (cherry picked from commit 5a8b5fdd6ffa58f015cdadf3f2c6df78e0a388ad) Signed-off-by: Yin Huai Conflicts: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc3fb846 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc3fb846 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc3fb846 Branch: refs/heads/branch-1.5 Commit: fc3fb84636ba5f84220e1f46036841f1cb61f1d0 Parents: f28399e Author: Huaxin Gao Authored: Tue Dec 1 15:32:57 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 16:03:33 2015 -0800 -- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 4 +++- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 +++ 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc3fb846/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 018a009..a956ae6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException} +import java.sql.{Connection, Date, DriverManager, ResultSet, ResultSetMetaData, SQLException, Timestamp} import java.util.Properties import org.apache.commons.lang3.StringUtils @@ -263,6 +263,8 @@ private[sql] class JDBCRDD( */ private def compileValue(value: Any): Any = value match { case stringValue: String => s"'${escapeSql(stringValue)}'" +case timestampValue: Timestamp => "'" + timestampValue + "'" +case dateValue: Date => "'" + dateValue + "'" case _ => value } http://git-wip-us.apache.org/repos/asf/spark/blob/fc3fb846/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 0edac08..6fc0c6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -443,4 +443,15 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(agg.getCatalystType(0, "", 1, null) === Some(LongType)) assert(agg.getCatalystType(1, "", 1, null) === Some(StringType)) } + + test("Test DataFrame.where for Date and Timestamp") { +// Regression test for bug SPARK-11788 +val timestamp = java.sql.Timestamp.valueOf("2001-02-20 11:22:33.543543"); +val date = java.sql.Date.valueOf("1995-01-01") +val jdbcDf = sqlContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties) +val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect() +assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01")) +assert(rows(0).getAs[java.sql.Timestamp](2) + === java.sql.Timestamp.valueOf("2002-02-20 11:22:33.543543")) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11328][SQL] Improve error message when hitting this issue
Repository: spark Updated Branches: refs/heads/branch-1.5 80dac0b07 -> f28399e1a [SPARK-11328][SQL] Improve error message when hitting this issue The issue is that the output commiter is not idempotent and retry attempts will fail because the output file already exists. It is not safe to clean up the file as this output committer is by design not retryable. Currently, the job fails with a confusing file exists error. This patch is a stop gap to tell the user to look at the top of the error log for the proper message. This is difficult to test locally as Spark is hardcoded not to retry. Manually verified by upping the retry attempts. Author: Nong Li Author: Nong Li Closes #10080 from nongli/spark-11328. (cherry picked from commit 47a0abc343550c855e679de12983f43e6fcc0171) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f28399e1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f28399e1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f28399e1 Branch: refs/heads/branch-1.5 Commit: f28399e1ab6fd3a829d05ffbc637aa2c86cffdf2 Parents: 80dac0b Author: Nong Li Authored: Tue Dec 1 15:30:21 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 15:59:09 2015 -0800 -- .../execution/datasources/WriterContainer.scala | 22 ++-- .../parquet/DirectParquetOutputCommitter.scala | 3 ++- 2 files changed, 22 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f28399e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index c1599f1..6c39dba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -122,6 +122,24 @@ private[sql] abstract class BaseWriterContainer( } } + protected def newOutputWriter(path: String): OutputWriter = { +try { + outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext) +} catch { + case e: org.apache.hadoop.fs.FileAlreadyExistsException => +if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) { + // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry + // attempts, the task will fail because the output file is created from a prior attempt. + // This often means the most visible error to the user is misleading. Augment the error + // to tell the user to look for the actual error. + throw new SparkException("The output file already exists but this could be due to a " + +"failure from an earlier attempt. Look through the earlier logs or stage page for " + +"the first error.\n File exists error: " + e) +} +throw e +} + } + private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) @@ -230,7 +248,7 @@ private[sql] class DefaultWriterContainer( executorSideSetup(taskContext) val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) configuration.set("spark.sql.sources.output.path", outputPath) -val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) +val writer = newOutputWriter(getWorkPath) writer.initConverter(dataSchema) var writerClosed = false @@ -400,7 +418,7 @@ private[sql] class DynamicPartitionWriterContainer( val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) configuration.set( "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) - val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) + val newWriter = super.newOutputWriter(path.toString) newWriter.initConverter(dataSchema) newWriter } http://git-wip-us.apache.org/repos/asf/spark/blob/f28399e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala index 2c6b914..cf61a2e 1006
spark git commit: [SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source
Repository: spark Updated Branches: refs/heads/branch-1.6 f1122dd2b -> 1135430a0 [SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source When query the Timestamp or Date column like the following val filtered = jdbcdf.where($"TIMESTAMP_COLUMN" >= beg && $"TIMESTAMP_COLUMN" < end) The generated SQL query is "TIMESTAMP_COLUMN >= 2015-01-01 00:00:00.0" It should have quote around the Timestamp/Date value such as "TIMESTAMP_COLUMN >= '2015-01-01 00:00:00.0'" Author: Huaxin Gao Closes #9872 from huaxingao/spark-11788. (cherry picked from commit 5a8b5fdd6ffa58f015cdadf3f2c6df78e0a388ad) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1135430a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1135430a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1135430a Branch: refs/heads/branch-1.6 Commit: 1135430a00dbe6516097dd3bc868ae865e8e644d Parents: f1122dd Author: Huaxin Gao Authored: Tue Dec 1 15:32:57 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 15:33:18 2015 -0800 -- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 4 +++- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 +++ 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1135430a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index f9b7259..32f0889 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException} +import java.sql.{Connection, Date, DriverManager, ResultSet, ResultSetMetaData, SQLException, Timestamp} import java.util.Properties import org.apache.commons.lang3.StringUtils @@ -265,6 +265,8 @@ private[sql] class JDBCRDD( */ private def compileValue(value: Any): Any = value match { case stringValue: String => s"'${escapeSql(stringValue)}'" +case timestampValue: Timestamp => "'" + timestampValue + "'" +case dateValue: Date => "'" + dateValue + "'" case _ => value } http://git-wip-us.apache.org/repos/asf/spark/blob/1135430a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index d530b1a..8c24aa3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -484,4 +484,15 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(h2.getTableExistsQuery(table) == defaultQuery) assert(derby.getTableExistsQuery(table) == defaultQuery) } + + test("Test DataFrame.where for Date and Timestamp") { +// Regression test for bug SPARK-11788 +val timestamp = java.sql.Timestamp.valueOf("2001-02-20 11:22:33.543543"); +val date = java.sql.Date.valueOf("1995-01-01") +val jdbcDf = sqlContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties) +val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect() +assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01")) +assert(rows(0).getAs[java.sql.Timestamp](2) + === java.sql.Timestamp.valueOf("2002-02-20 11:22:33.543543")) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source
Repository: spark Updated Branches: refs/heads/master 47a0abc34 -> 5a8b5fdd6 [SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source When query the Timestamp or Date column like the following val filtered = jdbcdf.where($"TIMESTAMP_COLUMN" >= beg && $"TIMESTAMP_COLUMN" < end) The generated SQL query is "TIMESTAMP_COLUMN >= 2015-01-01 00:00:00.0" It should have quote around the Timestamp/Date value such as "TIMESTAMP_COLUMN >= '2015-01-01 00:00:00.0'" Author: Huaxin Gao Closes #9872 from huaxingao/spark-11788. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a8b5fdd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a8b5fdd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a8b5fdd Branch: refs/heads/master Commit: 5a8b5fdd6ffa58f015cdadf3f2c6df78e0a388ad Parents: 47a0abc Author: Huaxin Gao Authored: Tue Dec 1 15:32:57 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 15:32:57 2015 -0800 -- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 4 +++- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 +++ 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5a8b5fdd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 57a8a04..392d3ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException} +import java.sql.{Connection, Date, DriverManager, ResultSet, ResultSetMetaData, SQLException, Timestamp} import java.util.Properties import scala.util.control.NonFatal @@ -267,6 +267,8 @@ private[sql] class JDBCRDD( */ private def compileValue(value: Any): Any = value match { case stringValue: String => s"'${escapeSql(stringValue)}'" +case timestampValue: Timestamp => "'" + timestampValue + "'" +case dateValue: Date => "'" + dateValue + "'" case _ => value } http://git-wip-us.apache.org/repos/asf/spark/blob/5a8b5fdd/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index d530b1a..8c24aa3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -484,4 +484,15 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(h2.getTableExistsQuery(table) == defaultQuery) assert(derby.getTableExistsQuery(table) == defaultQuery) } + + test("Test DataFrame.where for Date and Timestamp") { +// Regression test for bug SPARK-11788 +val timestamp = java.sql.Timestamp.valueOf("2001-02-20 11:22:33.543543"); +val date = java.sql.Date.valueOf("1995-01-01") +val jdbcDf = sqlContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties) +val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect() +assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01")) +assert(rows(0).getAs[java.sql.Timestamp](2) + === java.sql.Timestamp.valueOf("2002-02-20 11:22:33.543543")) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11328][SQL] Improve error message when hitting this issue
Repository: spark Updated Branches: refs/heads/master ef6790fdc -> 47a0abc34 [SPARK-11328][SQL] Improve error message when hitting this issue The issue is that the output commiter is not idempotent and retry attempts will fail because the output file already exists. It is not safe to clean up the file as this output committer is by design not retryable. Currently, the job fails with a confusing file exists error. This patch is a stop gap to tell the user to look at the top of the error log for the proper message. This is difficult to test locally as Spark is hardcoded not to retry. Manually verified by upping the retry attempts. Author: Nong Li Author: Nong Li Closes #10080 from nongli/spark-11328. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47a0abc3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47a0abc3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47a0abc3 Branch: refs/heads/master Commit: 47a0abc343550c855e679de12983f43e6fcc0171 Parents: ef6790f Author: Nong Li Authored: Tue Dec 1 15:30:21 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 15:30:21 2015 -0800 -- .../execution/datasources/WriterContainer.scala | 22 ++-- .../parquet/DirectParquetOutputCommitter.scala | 3 ++- 2 files changed, 22 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/47a0abc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 1b59b19..ad55367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -124,6 +124,24 @@ private[sql] abstract class BaseWriterContainer( } } + protected def newOutputWriter(path: String): OutputWriter = { +try { + outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext) +} catch { + case e: org.apache.hadoop.fs.FileAlreadyExistsException => +if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) { + // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry + // attempts, the task will fail because the output file is created from a prior attempt. + // This often means the most visible error to the user is misleading. Augment the error + // to tell the user to look for the actual error. + throw new SparkException("The output file already exists but this could be due to a " + +"failure from an earlier attempt. Look through the earlier logs or stage page for " + +"the first error.\n File exists error: " + e) +} +throw e +} + } + private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) @@ -234,7 +252,7 @@ private[sql] class DefaultWriterContainer( executorSideSetup(taskContext) val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) configuration.set("spark.sql.sources.output.path", outputPath) -val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) +val writer = newOutputWriter(getWorkPath) writer.initConverter(dataSchema) var writerClosed = false @@ -403,7 +421,7 @@ private[sql] class DynamicPartitionWriterContainer( val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) configuration.set( "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) - val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) + val newWriter = super.newOutputWriter(path.toString) newWriter.initConverter(dataSchema) newWriter } http://git-wip-us.apache.org/repos/asf/spark/blob/47a0abc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala index 300e867..1a4e99f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutpu
spark git commit: [SPARK-12075][SQL] Speed up HiveComparisionTest by avoiding / speeding up TestHive.reset()
Repository: spark Updated Branches: refs/heads/branch-1.6 012de2ce5 -> d77bf0bd9 [SPARK-12075][SQL] Speed up HiveComparisionTest by avoiding / speeding up TestHive.reset() When profiling HiveCompatibilitySuite, I noticed that most of the time seems to be spent in expensive `TestHive.reset()` calls. This patch speeds up suites based on HiveComparisionTest, such as HiveCompatibilitySuite, with the following changes: - Avoid `TestHive.reset()` whenever possible: - Use a simple set of heuristics to guess whether we need to call `reset()` in between tests. - As a safety-net, automatically re-run failed tests by calling `reset()` before the re-attempt. - Speed up the expensive parts of `TestHive.reset()`: loading the `src` and `srcpart` tables took roughly 600ms per test, so we now avoid this by using a simple heuristic which only loads those tables by tests that reference them. This is based on simple string matching over the test queries which errs on the side of loading in more situations than might be strictly necessary. After these changes, HiveCompatibilitySuite seems to run in about 10 minutes. This PR is a revival of #6663, an earlier experimental PR from June, where I played around with several possible speedups for this suite. Author: Josh Rosen Closes #10055 from JoshRosen/speculative-testhive-reset. (cherry picked from commit ef6790fdc3b70b9d6184ec2b3d926e4b0e4b15f6) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d77bf0bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d77bf0bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d77bf0bd Branch: refs/heads/branch-1.6 Commit: d77bf0bd922835b6a63bb1eeedf91e2a92d92ca9 Parents: 012de2c Author: Josh Rosen Authored: Wed Dec 2 07:29:45 2015 +0800 Committer: Reynold Xin Committed: Wed Dec 2 07:30:07 2015 +0800 -- .../apache/spark/sql/hive/test/TestHive.scala | 7 -- .../sql/hive/execution/HiveComparisonTest.scala | 67 ++-- .../sql/hive/execution/HiveQueryFileTest.scala | 2 +- 3 files changed, 62 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d77bf0bd/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 6883d30..2e2d201 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -443,13 +443,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { defaultOverrides() runSqlHive("USE default") - - // Just loading src makes a lot of tests pass. This is because some tests do something like - // drop an index on src at the beginning. Since we just pass DDL to hive this bypasses our - // Analyzer and thus the test table auto-loading mechanism. - // Remove after we handle more DDL operations natively. - loadTestTable("src") - loadTestTable("srcpart") } catch { case e: Exception => logError("FATAL ERROR: Failed to reset TestDB state.", e) http://git-wip-us.apache.org/repos/asf/spark/blob/d77bf0bd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index aa95ba9..4455430 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -209,7 +209,11 @@ abstract class HiveComparisonTest } val installHooksCommand = "(?i)SET.*hooks".r - def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) { + def createQueryTest( + testCaseName: String, + sql: String, + reset: Boolean = true, + tryWithoutResettingFirst: Boolean = false) { // testCaseName must not contain ':', which is not allowed to appear in a filename of Windows assert(!testCaseName.contains(":")) @@ -240,9 +244,6 @@ abstract class HiveComparisonTest test(testCaseName) { logDebug(s"=== HIVE TEST: $testCaseName ===") - // Clear old output for this testcase. - outputDirectories.map(new File(_, testCaseName)).filter(_.exists()).foreach(_.delete()) - val sqlWithoutComment = sql.split("\n").filterNot(l => l.matches("--.*(?<=[^]);
spark git commit: [SPARK-12075][SQL] Speed up HiveComparisionTest by avoiding / speeding up TestHive.reset()
Repository: spark Updated Branches: refs/heads/master f292018f8 -> ef6790fdc [SPARK-12075][SQL] Speed up HiveComparisionTest by avoiding / speeding up TestHive.reset() When profiling HiveCompatibilitySuite, I noticed that most of the time seems to be spent in expensive `TestHive.reset()` calls. This patch speeds up suites based on HiveComparisionTest, such as HiveCompatibilitySuite, with the following changes: - Avoid `TestHive.reset()` whenever possible: - Use a simple set of heuristics to guess whether we need to call `reset()` in between tests. - As a safety-net, automatically re-run failed tests by calling `reset()` before the re-attempt. - Speed up the expensive parts of `TestHive.reset()`: loading the `src` and `srcpart` tables took roughly 600ms per test, so we now avoid this by using a simple heuristic which only loads those tables by tests that reference them. This is based on simple string matching over the test queries which errs on the side of loading in more situations than might be strictly necessary. After these changes, HiveCompatibilitySuite seems to run in about 10 minutes. This PR is a revival of #6663, an earlier experimental PR from June, where I played around with several possible speedups for this suite. Author: Josh Rosen Closes #10055 from JoshRosen/speculative-testhive-reset. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef6790fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef6790fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef6790fd Branch: refs/heads/master Commit: ef6790fdc3b70b9d6184ec2b3d926e4b0e4b15f6 Parents: f292018 Author: Josh Rosen Authored: Wed Dec 2 07:29:45 2015 +0800 Committer: Reynold Xin Committed: Wed Dec 2 07:29:45 2015 +0800 -- .../apache/spark/sql/hive/test/TestHive.scala | 7 -- .../sql/hive/execution/HiveComparisonTest.scala | 67 ++-- .../sql/hive/execution/HiveQueryFileTest.scala | 2 +- 3 files changed, 62 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef6790fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 6883d30..2e2d201 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -443,13 +443,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { defaultOverrides() runSqlHive("USE default") - - // Just loading src makes a lot of tests pass. This is because some tests do something like - // drop an index on src at the beginning. Since we just pass DDL to hive this bypasses our - // Analyzer and thus the test table auto-loading mechanism. - // Remove after we handle more DDL operations natively. - loadTestTable("src") - loadTestTable("srcpart") } catch { case e: Exception => logError("FATAL ERROR: Failed to reset TestDB state.", e) http://git-wip-us.apache.org/repos/asf/spark/blob/ef6790fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index aa95ba9..4455430 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -209,7 +209,11 @@ abstract class HiveComparisonTest } val installHooksCommand = "(?i)SET.*hooks".r - def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) { + def createQueryTest( + testCaseName: String, + sql: String, + reset: Boolean = true, + tryWithoutResettingFirst: Boolean = false) { // testCaseName must not contain ':', which is not allowed to appear in a filename of Windows assert(!testCaseName.contains(":")) @@ -240,9 +244,6 @@ abstract class HiveComparisonTest test(testCaseName) { logDebug(s"=== HIVE TEST: $testCaseName ===") - // Clear old output for this testcase. - outputDirectories.map(new File(_, testCaseName)).filter(_.exists()).foreach(_.delete()) - val sqlWithoutComment = sql.split("\n").filterNot(l => l.matches("--.*(?<=[^]);")).mkString("\n") val allQueries = @@ -269,11 +270,32 @@ abstract class HiveComparisonTest
spark git commit: [SPARK-11328][SQL] Improve error message when hitting this issue
Repository: spark Updated Branches: refs/heads/branch-1.6 d77bf0bd9 -> f1122dd2b [SPARK-11328][SQL] Improve error message when hitting this issue The issue is that the output commiter is not idempotent and retry attempts will fail because the output file already exists. It is not safe to clean up the file as this output committer is by design not retryable. Currently, the job fails with a confusing file exists error. This patch is a stop gap to tell the user to look at the top of the error log for the proper message. This is difficult to test locally as Spark is hardcoded not to retry. Manually verified by upping the retry attempts. Author: Nong Li Author: Nong Li Closes #10080 from nongli/spark-11328. (cherry picked from commit 47a0abc343550c855e679de12983f43e6fcc0171) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1122dd2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1122dd2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1122dd2 Branch: refs/heads/branch-1.6 Commit: f1122dd2bdc4c522a902b37bd34b46f785c21ecf Parents: d77bf0b Author: Nong Li Authored: Tue Dec 1 15:30:21 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 15:30:30 2015 -0800 -- .../execution/datasources/WriterContainer.scala | 22 ++-- .../parquet/DirectParquetOutputCommitter.scala | 3 ++- 2 files changed, 22 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f1122dd2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 1b59b19..ad55367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -124,6 +124,24 @@ private[sql] abstract class BaseWriterContainer( } } + protected def newOutputWriter(path: String): OutputWriter = { +try { + outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext) +} catch { + case e: org.apache.hadoop.fs.FileAlreadyExistsException => +if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) { + // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry + // attempts, the task will fail because the output file is created from a prior attempt. + // This often means the most visible error to the user is misleading. Augment the error + // to tell the user to look for the actual error. + throw new SparkException("The output file already exists but this could be due to a " + +"failure from an earlier attempt. Look through the earlier logs or stage page for " + +"the first error.\n File exists error: " + e) +} +throw e +} + } + private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) @@ -234,7 +252,7 @@ private[sql] class DefaultWriterContainer( executorSideSetup(taskContext) val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) configuration.set("spark.sql.sources.output.path", outputPath) -val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) +val writer = newOutputWriter(getWorkPath) writer.initConverter(dataSchema) var writerClosed = false @@ -403,7 +421,7 @@ private[sql] class DynamicPartitionWriterContainer( val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) configuration.set( "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) - val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) + val newWriter = super.newOutputWriter(path.toString) newWriter.initConverter(dataSchema) newWriter } http://git-wip-us.apache.org/repos/asf/spark/blob/f1122dd2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala index 300e867..1a4e99f 1006
spark git commit: [SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery issue
Repository: spark Updated Branches: refs/heads/branch-1.6 5647774b0 -> 012de2ce5 [SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery issue Fixed a minor race condition in #10017 Closes #10017 Author: jerryshao Author: Shixiong Zhu Closes #10074 from zsxwing/review-pr10017. (cherry picked from commit f292018f8e57779debc04998456ec875f628133b) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/012de2ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/012de2ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/012de2ce Branch: refs/heads/branch-1.6 Commit: 012de2ce5de01bc57197fa26334fc175c8f20233 Parents: 5647774 Author: jerryshao Authored: Tue Dec 1 15:26:10 2015 -0800 Committer: Shixiong Zhu Committed: Tue Dec 1 15:26:20 2015 -0800 -- python/pyspark/streaming/tests.py | 49 ++ python/pyspark/streaming/util.py | 13 - 2 files changed, 56 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/012de2ce/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index d380d69..a2bfd79 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1150,6 +1150,55 @@ class KafkaStreamTests(PySparkStreamingTestCase): self.assertNotEqual(topic_and_partition_a, topic_and_partition_d) @unittest.skipIf(sys.version >= "3", "long type not support") +def test_kafka_direct_stream_transform_with_checkpoint(self): +"""Test the Python direct Kafka stream transform with checkpoint correctly recovered.""" +topic = self._randomTopic() +sendData = {"a": 1, "b": 2, "c": 3} +kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), + "auto.offset.reset": "smallest"} + +self._kafkaTestUtils.createTopic(topic) +self._kafkaTestUtils.sendMessages(topic, sendData) + +offsetRanges = [] + +def transformWithOffsetRanges(rdd): +for o in rdd.offsetRanges(): +offsetRanges.append(o) +return rdd + +self.ssc.stop(False) +self.ssc = None +tmpdir = "checkpoint-test-%d" % random.randint(0, 1) + +def setup(): +ssc = StreamingContext(self.sc, 0.5) +ssc.checkpoint(tmpdir) +stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams) +stream.transform(transformWithOffsetRanges).count().pprint() +return ssc + +try: +ssc1 = StreamingContext.getOrCreate(tmpdir, setup) +ssc1.start() +self.wait_for(offsetRanges, 1) +self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) + +# To make sure some checkpoint is written +time.sleep(3) +ssc1.stop(False) +ssc1 = None + +# Restart again to make sure the checkpoint is recovered correctly +ssc2 = StreamingContext.getOrCreate(tmpdir, setup) +ssc2.start() +ssc2.awaitTermination(3) +ssc2.stop(stopSparkContext=False, stopGraceFully=True) +ssc2 = None +finally: +shutil.rmtree(tmpdir) + +@unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd_message_handler(self): """Test Python direct Kafka RDD MessageHandler.""" topic = self._randomTopic() http://git-wip-us.apache.org/repos/asf/spark/blob/012de2ce/python/pyspark/streaming/util.py -- diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index c7f02bc..abbbf6e 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -37,11 +37,11 @@ class TransformFunction(object): self.ctx = ctx self.func = func self.deserializers = deserializers -self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser) +self.rdd_wrap_func = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser) self.failure = None def rdd_wrapper(self, func): -self._rdd_wrapper = func +self.rdd_wrap_func = func return self def call(self, milliseconds, jrdds): @@ -59,7 +59,7 @@ class TransformFunction(object): if len(sers) < len(jrdds): sers += (sers[0],) * (len(jrdds) - len(sers)) -rdds = [self._rdd_wrapper(jrdd, self.ctx, ser) if jrdd else None +rdds = [self.rdd_wrap_func(jrdd, self.ctx, ser) if jrdd else None
spark git commit: [SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery issue
Repository: spark Updated Branches: refs/heads/master e76431f88 -> f292018f8 [SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery issue Fixed a minor race condition in #10017 Closes #10017 Author: jerryshao Author: Shixiong Zhu Closes #10074 from zsxwing/review-pr10017. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f292018f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f292018f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f292018f Branch: refs/heads/master Commit: f292018f8e57779debc04998456ec875f628133b Parents: e76431f Author: jerryshao Authored: Tue Dec 1 15:26:10 2015 -0800 Committer: Shixiong Zhu Committed: Tue Dec 1 15:26:10 2015 -0800 -- python/pyspark/streaming/tests.py | 49 ++ python/pyspark/streaming/util.py | 13 - 2 files changed, 56 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f292018f/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a647e6b..d50c6b8 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1150,6 +1150,55 @@ class KafkaStreamTests(PySparkStreamingTestCase): self.assertNotEqual(topic_and_partition_a, topic_and_partition_d) @unittest.skipIf(sys.version >= "3", "long type not support") +def test_kafka_direct_stream_transform_with_checkpoint(self): +"""Test the Python direct Kafka stream transform with checkpoint correctly recovered.""" +topic = self._randomTopic() +sendData = {"a": 1, "b": 2, "c": 3} +kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), + "auto.offset.reset": "smallest"} + +self._kafkaTestUtils.createTopic(topic) +self._kafkaTestUtils.sendMessages(topic, sendData) + +offsetRanges = [] + +def transformWithOffsetRanges(rdd): +for o in rdd.offsetRanges(): +offsetRanges.append(o) +return rdd + +self.ssc.stop(False) +self.ssc = None +tmpdir = "checkpoint-test-%d" % random.randint(0, 1) + +def setup(): +ssc = StreamingContext(self.sc, 0.5) +ssc.checkpoint(tmpdir) +stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams) +stream.transform(transformWithOffsetRanges).count().pprint() +return ssc + +try: +ssc1 = StreamingContext.getOrCreate(tmpdir, setup) +ssc1.start() +self.wait_for(offsetRanges, 1) +self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) + +# To make sure some checkpoint is written +time.sleep(3) +ssc1.stop(False) +ssc1 = None + +# Restart again to make sure the checkpoint is recovered correctly +ssc2 = StreamingContext.getOrCreate(tmpdir, setup) +ssc2.start() +ssc2.awaitTermination(3) +ssc2.stop(stopSparkContext=False, stopGraceFully=True) +ssc2 = None +finally: +shutil.rmtree(tmpdir) + +@unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd_message_handler(self): """Test Python direct Kafka RDD MessageHandler.""" topic = self._randomTopic() http://git-wip-us.apache.org/repos/asf/spark/blob/f292018f/python/pyspark/streaming/util.py -- diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index c7f02bc..abbbf6e 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -37,11 +37,11 @@ class TransformFunction(object): self.ctx = ctx self.func = func self.deserializers = deserializers -self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser) +self.rdd_wrap_func = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser) self.failure = None def rdd_wrapper(self, func): -self._rdd_wrapper = func +self.rdd_wrap_func = func return self def call(self, milliseconds, jrdds): @@ -59,7 +59,7 @@ class TransformFunction(object): if len(sers) < len(jrdds): sers += (sers[0],) * (len(jrdds) - len(sers)) -rdds = [self._rdd_wrapper(jrdd, self.ctx, ser) if jrdd else None +rdds = [self.rdd_wrap_func(jrdd, self.ctx, ser) if jrdd else None for jrdd, ser in zip(jrdds, sers)] t = datetime.fromtimestamp(milliseconds / 1000.0)
spark git commit: [SPARK-11961][DOC] Add docs of ChiSqSelector
Repository: spark Updated Branches: refs/heads/master 328b757d5 -> e76431f88 [SPARK-11961][DOC] Add docs of ChiSqSelector https://issues.apache.org/jira/browse/SPARK-11961 Author: Xusen Yin Closes #9965 from yinxusen/SPARK-11961. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e76431f8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e76431f8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e76431f8 Branch: refs/heads/master Commit: e76431f886ae8061545b3216e8e2fb38c4db1f43 Parents: 328b757 Author: Xusen Yin Authored: Tue Dec 1 15:21:53 2015 -0800 Committer: Joseph K. Bradley Committed: Tue Dec 1 15:21:53 2015 -0800 -- docs/ml-features.md | 50 ++ .../examples/ml/JavaChiSqSelectorExample.java | 71 .../examples/ml/ChiSqSelectorExample.scala | 57 3 files changed, 178 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e76431f8/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index cd1838d..5f88877 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -1949,3 +1949,53 @@ output.select("features", "label").show() {% endhighlight %} + +## ChiSqSelector + +`ChiSqSelector` stands for Chi-Squared feature selection. It operates on labeled data with +categorical features. ChiSqSelector orders features based on a +[Chi-Squared test of independence](https://en.wikipedia.org/wiki/Chi-squared_test) +from the class, and then filters (selects) the top features which the class label depends on the +most. This is akin to yielding the features with the most predictive power. + +**Examples** + +Assume that we have a DataFrame with the columns `id`, `features`, and `clicked`, which is used as +our target to be predicted: + +~~~ +id | features | clicked +---|---|- + 7 | [0.0, 0.0, 18.0, 1.0] | 1.0 + 8 | [0.0, 1.0, 12.0, 0.0] | 0.0 + 9 | [1.0, 0.0, 15.0, 0.1] | 0.0 +~~~ + +If we use `ChiSqSelector` with a `numTopFeatures = 1`, then according to our label `clicked` the +last column in our `features` chosen as the most useful feature: + +~~~ +id | features | clicked | selectedFeatures +---|---|-|-- + 7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0] + 8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0] + 9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1] +~~~ + + + + +Refer to the [ChiSqSelector Scala docs](api/scala/index.html#org.apache.spark.ml.feature.ChiSqSelector) +for more details on the API. + +{% include_example scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala %} + + + + +Refer to the [ChiSqSelector Java docs](api/java/org/apache/spark/ml/feature/ChiSqSelector.html) +for more details on the API. + +{% include_example java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java %} + + http://git-wip-us.apache.org/repos/asf/spark/blob/e76431f8/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java new file mode 100644 index 000..ede05d6 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; + +// $example on$ +import java.util.Arrays; + +import org.apache.spark.ml.feature.ChiSqSelector; +import org.apache.spark.mllib.linalg.VectorUDT; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.sql.DataFrame; +import
spark git commit: [SPARK-11961][DOC] Add docs of ChiSqSelector
Repository: spark Updated Branches: refs/heads/branch-1.6 21909b8ac -> 5647774b0 [SPARK-11961][DOC] Add docs of ChiSqSelector https://issues.apache.org/jira/browse/SPARK-11961 Author: Xusen Yin Closes #9965 from yinxusen/SPARK-11961. (cherry picked from commit e76431f886ae8061545b3216e8e2fb38c4db1f43) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5647774b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5647774b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5647774b Branch: refs/heads/branch-1.6 Commit: 5647774b07593514f4ed4c29a038cfb1b69c9ba1 Parents: 21909b8 Author: Xusen Yin Authored: Tue Dec 1 15:21:53 2015 -0800 Committer: Joseph K. Bradley Committed: Tue Dec 1 15:22:04 2015 -0800 -- docs/ml-features.md | 50 ++ .../examples/ml/JavaChiSqSelectorExample.java | 71 .../examples/ml/ChiSqSelectorExample.scala | 57 3 files changed, 178 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5647774b/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index cd1838d..5f88877 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -1949,3 +1949,53 @@ output.select("features", "label").show() {% endhighlight %} + +## ChiSqSelector + +`ChiSqSelector` stands for Chi-Squared feature selection. It operates on labeled data with +categorical features. ChiSqSelector orders features based on a +[Chi-Squared test of independence](https://en.wikipedia.org/wiki/Chi-squared_test) +from the class, and then filters (selects) the top features which the class label depends on the +most. This is akin to yielding the features with the most predictive power. + +**Examples** + +Assume that we have a DataFrame with the columns `id`, `features`, and `clicked`, which is used as +our target to be predicted: + +~~~ +id | features | clicked +---|---|- + 7 | [0.0, 0.0, 18.0, 1.0] | 1.0 + 8 | [0.0, 1.0, 12.0, 0.0] | 0.0 + 9 | [1.0, 0.0, 15.0, 0.1] | 0.0 +~~~ + +If we use `ChiSqSelector` with a `numTopFeatures = 1`, then according to our label `clicked` the +last column in our `features` chosen as the most useful feature: + +~~~ +id | features | clicked | selectedFeatures +---|---|-|-- + 7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0] + 8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0] + 9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1] +~~~ + + + + +Refer to the [ChiSqSelector Scala docs](api/scala/index.html#org.apache.spark.ml.feature.ChiSqSelector) +for more details on the API. + +{% include_example scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala %} + + + + +Refer to the [ChiSqSelector Java docs](api/java/org/apache/spark/ml/feature/ChiSqSelector.html) +for more details on the API. + +{% include_example java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java %} + + http://git-wip-us.apache.org/repos/asf/spark/blob/5647774b/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java new file mode 100644 index 000..ede05d6 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; + +// $example on$ +import java.util.Arrays; + +import org.apache.spark.ml.feature.ChiSqSelector; +import org.apache.spark.mllib.l
spark git commit: Revert "[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize"
Repository: spark Updated Branches: refs/heads/branch-1.6 81db8d086 -> 21909b8ac Revert "[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize" This reverts commit 9b99b2b46c452ba396e922db5fc7eec02c45b158. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21909b8a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21909b8a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21909b8a Branch: refs/heads/branch-1.6 Commit: 21909b8ac0068658cc833f324c0f1f418c200d61 Parents: 81db8d0 Author: Shixiong Zhu Authored: Tue Dec 1 15:16:07 2015 -0800 Committer: Shixiong Zhu Committed: Tue Dec 1 15:16:07 2015 -0800 -- .../spark/serializer/JavaSerializer.scala | 7 +++-- .../spark/util/ByteBufferOutputStream.scala | 31 2 files changed, 4 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/21909b8a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala -- diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index ea718a0..b463a71 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -24,7 +24,8 @@ import scala.reflect.ClassTag import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} +import org.apache.spark.util.ByteBufferInputStream +import org.apache.spark.util.Utils private[spark] class JavaSerializationStream( out: OutputStream, counterReset: Int, extraDebugInfo: Boolean) @@ -95,11 +96,11 @@ private[spark] class JavaSerializerInstance( extends SerializerInstance { override def serialize[T: ClassTag](t: T): ByteBuffer = { -val bos = new ByteBufferOutputStream() +val bos = new ByteArrayOutputStream() val out = serializeStream(bos) out.writeObject(t) out.close() -bos.toByteBuffer +ByteBuffer.wrap(bos.toByteArray) } override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { http://git-wip-us.apache.org/repos/asf/spark/blob/21909b8a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala deleted file mode 100644 index 92e4522..000 --- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.io.ByteArrayOutputStream -import java.nio.ByteBuffer - -/** - * Provide a zero-copy way to convert data in ByteArrayOutputStream to ByteBuffer - */ -private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream { - - def toByteBuffer: ByteBuffer = { -return ByteBuffer.wrap(buf, 0, count) - } -} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize"
Repository: spark Updated Branches: refs/heads/master 60b541ee1 -> 328b757d5 Revert "[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize" This reverts commit 1401166576c7018c5f9c31e0a6703d5fb16ea339. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/328b757d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/328b757d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/328b757d Branch: refs/heads/master Commit: 328b757d5d4486ea3c2e246780792d7a57ee85e5 Parents: 60b541e Author: Shixiong Zhu Authored: Tue Dec 1 15:13:10 2015 -0800 Committer: Shixiong Zhu Committed: Tue Dec 1 15:13:10 2015 -0800 -- .../spark/serializer/JavaSerializer.scala | 7 +++-- .../spark/util/ByteBufferOutputStream.scala | 31 2 files changed, 4 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/328b757d/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala -- diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index ea718a0..b463a71 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -24,7 +24,8 @@ import scala.reflect.ClassTag import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} +import org.apache.spark.util.ByteBufferInputStream +import org.apache.spark.util.Utils private[spark] class JavaSerializationStream( out: OutputStream, counterReset: Int, extraDebugInfo: Boolean) @@ -95,11 +96,11 @@ private[spark] class JavaSerializerInstance( extends SerializerInstance { override def serialize[T: ClassTag](t: T): ByteBuffer = { -val bos = new ByteBufferOutputStream() +val bos = new ByteArrayOutputStream() val out = serializeStream(bos) out.writeObject(t) out.close() -bos.toByteBuffer +ByteBuffer.wrap(bos.toByteArray) } override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { http://git-wip-us.apache.org/repos/asf/spark/blob/328b757d/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala deleted file mode 100644 index 92e4522..000 --- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.io.ByteArrayOutputStream -import java.nio.ByteBuffer - -/** - * Provide a zero-copy way to convert data in ByteArrayOutputStream to ByteBuffer - */ -private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream { - - def toByteBuffer: ByteBuffer = { -return ByteBuffer.wrap(buf, 0, count) - } -} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing
Repository: spark Updated Branches: refs/heads/master 2cef1cdfb -> 60b541ee1 [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing The solution is the save the RDD partitioner in a separate file in the RDD checkpoint directory. That is, `/_partitioner`. In most cases, whether the RDD partitioner was recovered or not, does not affect the correctness, only reduces performance. So this solution makes a best-effort attempt to save and recover the partitioner. If either fails, the checkpointing is not affected. This makes this patch safe and backward compatible. Author: Tathagata Das Closes #9983 from tdas/SPARK-12004. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60b541ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60b541ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60b541ee Branch: refs/heads/master Commit: 60b541ee1b97c9e5e84aa2af2ce856f316ad22b3 Parents: 2cef1cd Author: Tathagata Das Authored: Tue Dec 1 14:08:36 2015 -0800 Committer: Andrew Or Committed: Tue Dec 1 14:08:36 2015 -0800 -- .../spark/rdd/ReliableCheckpointRDD.scala | 122 ++- .../spark/rdd/ReliableRDDCheckpointData.scala | 21 +--- .../org/apache/spark/CheckpointSuite.scala | 61 +- 3 files changed, 173 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/60b541ee/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index a69be6a..fa71b8c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -20,12 +20,12 @@ package org.apache.spark.rdd import java.io.IOException import scala.reflect.ClassTag +import scala.util.control.NonFatal import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.{SerializableConfiguration, Utils} /** @@ -33,8 +33,9 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} */ private[spark] class ReliableCheckpointRDD[T: ClassTag]( sc: SparkContext, -val checkpointPath: String) - extends CheckpointRDD[T](sc) { +val checkpointPath: String, +_partitioner: Option[Partitioner] = None + ) extends CheckpointRDD[T](sc) { @transient private val hadoopConf = sc.hadoopConfiguration @transient private val cpath = new Path(checkpointPath) @@ -47,7 +48,13 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( /** * Return the path of the checkpoint directory this RDD reads data from. */ - override def getCheckpointFile: Option[String] = Some(checkpointPath) + override val getCheckpointFile: Option[String] = Some(checkpointPath) + + override val partitioner: Option[Partitioner] = { +_partitioner.orElse { + ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, checkpointPath) +} + } /** * Return partitions described by the files in the checkpoint directory. @@ -100,10 +107,52 @@ private[spark] object ReliableCheckpointRDD extends Logging { "part-%05d".format(partitionIndex) } + private def checkpointPartitionerFileName(): String = { +"_partitioner" + } + + /** + * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD. + */ + def writeRDDToCheckpointDirectory[T: ClassTag]( + originalRDD: RDD[T], + checkpointDir: String, + blockSize: Int = -1): ReliableCheckpointRDD[T] = { + +val sc = originalRDD.sparkContext + +// Create the output path for the checkpoint +val checkpointDirPath = new Path(checkpointDir) +val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) +if (!fs.mkdirs(checkpointDirPath)) { + throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") +} + +// Save to file, and reload it as an RDD +val broadcastedConf = sc.broadcast( + new SerializableConfiguration(sc.hadoopConfiguration)) +// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) +sc.runJob(originalRDD, + writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) + +if (originalRDD.partitioner.nonEmpty) { + writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) +} + +val newRDD = new ReliableCheckpointRDD[T]( + sc, checkpointDirPath.toString, originalRDD.partitioner) +if (newRDD.partitions.length != o
spark git commit: [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing
Repository: spark Updated Branches: refs/heads/branch-1.6 1cf9d3858 -> 81db8d086 [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing The solution is the save the RDD partitioner in a separate file in the RDD checkpoint directory. That is, `/_partitioner`. In most cases, whether the RDD partitioner was recovered or not, does not affect the correctness, only reduces performance. So this solution makes a best-effort attempt to save and recover the partitioner. If either fails, the checkpointing is not affected. This makes this patch safe and backward compatible. Author: Tathagata Das Closes #9983 from tdas/SPARK-12004. (cherry picked from commit 60b541ee1b97c9e5e84aa2af2ce856f316ad22b3) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81db8d08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81db8d08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81db8d08 Branch: refs/heads/branch-1.6 Commit: 81db8d086bbfe72caa0c45a395ebcaed80b5c237 Parents: 1cf9d38 Author: Tathagata Das Authored: Tue Dec 1 14:08:36 2015 -0800 Committer: Andrew Or Committed: Tue Dec 1 14:08:45 2015 -0800 -- .../spark/rdd/ReliableCheckpointRDD.scala | 122 ++- .../spark/rdd/ReliableRDDCheckpointData.scala | 21 +--- .../org/apache/spark/CheckpointSuite.scala | 61 +- 3 files changed, 173 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/81db8d08/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index a69be6a..fa71b8c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -20,12 +20,12 @@ package org.apache.spark.rdd import java.io.IOException import scala.reflect.ClassTag +import scala.util.control.NonFatal import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.{SerializableConfiguration, Utils} /** @@ -33,8 +33,9 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} */ private[spark] class ReliableCheckpointRDD[T: ClassTag]( sc: SparkContext, -val checkpointPath: String) - extends CheckpointRDD[T](sc) { +val checkpointPath: String, +_partitioner: Option[Partitioner] = None + ) extends CheckpointRDD[T](sc) { @transient private val hadoopConf = sc.hadoopConfiguration @transient private val cpath = new Path(checkpointPath) @@ -47,7 +48,13 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( /** * Return the path of the checkpoint directory this RDD reads data from. */ - override def getCheckpointFile: Option[String] = Some(checkpointPath) + override val getCheckpointFile: Option[String] = Some(checkpointPath) + + override val partitioner: Option[Partitioner] = { +_partitioner.orElse { + ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, checkpointPath) +} + } /** * Return partitions described by the files in the checkpoint directory. @@ -100,10 +107,52 @@ private[spark] object ReliableCheckpointRDD extends Logging { "part-%05d".format(partitionIndex) } + private def checkpointPartitionerFileName(): String = { +"_partitioner" + } + + /** + * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD. + */ + def writeRDDToCheckpointDirectory[T: ClassTag]( + originalRDD: RDD[T], + checkpointDir: String, + blockSize: Int = -1): ReliableCheckpointRDD[T] = { + +val sc = originalRDD.sparkContext + +// Create the output path for the checkpoint +val checkpointDirPath = new Path(checkpointDir) +val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) +if (!fs.mkdirs(checkpointDirPath)) { + throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") +} + +// Save to file, and reload it as an RDD +val broadcastedConf = sc.broadcast( + new SerializableConfiguration(sc.hadoopConfiguration)) +// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) +sc.runJob(originalRDD, + writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) + +if (originalRDD.partitioner.nonEmpty) { + writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) +} + +val newRDD = new ReliableCheckpointRDD[T]
spark git commit: [SPARK-12030] Fix Platform.copyMemory to handle overlapping regions.
Repository: spark Updated Branches: refs/heads/master 34e7093c1 -> 2cef1cdfb [SPARK-12030] Fix Platform.copyMemory to handle overlapping regions. This bug was exposed as memory corruption in Timsort which uses copyMemory to copy large regions that can overlap. The prior implementation did not handle this case half the time and always copied forward, resulting in the data being corrupt. Author: Nong Li Closes #10068 from nongli/spark-12030. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2cef1cdf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2cef1cdf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2cef1cdf Branch: refs/heads/master Commit: 2cef1cdfbb5393270ae83179b6a4e50c3cbf9e93 Parents: 34e7093 Author: Nong Li Authored: Tue Dec 1 12:59:53 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 12:59:53 2015 -0800 -- .../java/org/apache/spark/unsafe/Platform.java | 27 +++-- .../apache/spark/unsafe/PlatformUtilSuite.java | 61 2 files changed, 82 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2cef1cdf/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java -- diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 1c16da9..0d6b215 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -107,12 +107,27 @@ public final class Platform { public static void copyMemory( Object src, long srcOffset, Object dst, long dstOffset, long length) { -while (length > 0) { - long size = Math.min(length, UNSAFE_COPY_THRESHOLD); - _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); - length -= size; - srcOffset += size; - dstOffset += size; +// Check if dstOffset is before or after srcOffset to determine if we should copy +// forward or backwards. This is necessary in case src and dst overlap. +if (dstOffset < srcOffset) { + while (length > 0) { +long size = Math.min(length, UNSAFE_COPY_THRESHOLD); +_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); +length -= size; +srcOffset += size; +dstOffset += size; + } +} else { + srcOffset += length; + dstOffset += length; + while (length > 0) { +long size = Math.min(length, UNSAFE_COPY_THRESHOLD); +srcOffset -= size; +dstOffset -= size; +_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); +length -= size; + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/2cef1cdf/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java -- diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java new file mode 100644 index 000..693ec6e --- /dev/null +++ b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe; + +import org.junit.Assert; +import org.junit.Test; + +public class PlatformUtilSuite { + + @Test + public void overlappingCopyMemory() { +byte[] data = new byte[3 * 1024 * 1024]; +int size = 2 * 1024 * 1024; +for (int i = 0; i < data.length; ++i) { + data[i] = (byte)i; +} + +Platform.copyMemory(data, Platform.BYTE_ARRAY_OFFSET, data, Platform.BYTE_ARRAY_OFFSET, size); +for (int i = 0; i < data.length; ++i) { + Assert.assertEquals((byte)i, data[i]); +} + +Platform.copyMemory( +data, +Platform.BYTE_ARRAY_OFFSET + 1, +data, +Platform.BYTE_ARRAY_OFFSET, +size); +for (int i = 0; i < size; ++i) { + Assert.assertEquals((byte)(i + 1), da
spark git commit: [SPARK-12030] Fix Platform.copyMemory to handle overlapping regions.
Repository: spark Updated Branches: refs/heads/branch-1.6 ab2a124c8 -> 1cf9d3858 [SPARK-12030] Fix Platform.copyMemory to handle overlapping regions. This bug was exposed as memory corruption in Timsort which uses copyMemory to copy large regions that can overlap. The prior implementation did not handle this case half the time and always copied forward, resulting in the data being corrupt. Author: Nong Li Closes #10068 from nongli/spark-12030. (cherry picked from commit 2cef1cdfbb5393270ae83179b6a4e50c3cbf9e93) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cf9d385 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cf9d385 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cf9d385 Branch: refs/heads/branch-1.6 Commit: 1cf9d3858c8a3a5796b64a9fbea22509f02d778a Parents: ab2a124 Author: Nong Li Authored: Tue Dec 1 12:59:53 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 13:00:05 2015 -0800 -- .../java/org/apache/spark/unsafe/Platform.java | 27 +++-- .../apache/spark/unsafe/PlatformUtilSuite.java | 61 2 files changed, 82 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1cf9d385/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java -- diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 1c16da9..0d6b215 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -107,12 +107,27 @@ public final class Platform { public static void copyMemory( Object src, long srcOffset, Object dst, long dstOffset, long length) { -while (length > 0) { - long size = Math.min(length, UNSAFE_COPY_THRESHOLD); - _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); - length -= size; - srcOffset += size; - dstOffset += size; +// Check if dstOffset is before or after srcOffset to determine if we should copy +// forward or backwards. This is necessary in case src and dst overlap. +if (dstOffset < srcOffset) { + while (length > 0) { +long size = Math.min(length, UNSAFE_COPY_THRESHOLD); +_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); +length -= size; +srcOffset += size; +dstOffset += size; + } +} else { + srcOffset += length; + dstOffset += length; + while (length > 0) { +long size = Math.min(length, UNSAFE_COPY_THRESHOLD); +srcOffset -= size; +dstOffset -= size; +_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); +length -= size; + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/1cf9d385/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java -- diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java new file mode 100644 index 000..693ec6e --- /dev/null +++ b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe; + +import org.junit.Assert; +import org.junit.Test; + +public class PlatformUtilSuite { + + @Test + public void overlappingCopyMemory() { +byte[] data = new byte[3 * 1024 * 1024]; +int size = 2 * 1024 * 1024; +for (int i = 0; i < data.length; ++i) { + data[i] = (byte)i; +} + +Platform.copyMemory(data, Platform.BYTE_ARRAY_OFFSET, data, Platform.BYTE_ARRAY_OFFSET, size); +for (int i = 0; i < data.length; ++i) { + Assert.assertEquals((byte)i, data[i]); +} + +Platform.copyMemory( +data, +Platform.BYTE_ARRAY_OFFSET + 1, +data, +Platform.BYTE_ARRAY_OFF
spark git commit: [SPARK-12065] Upgrade Tachyon from 0.8.1 to 0.8.2
Repository: spark Updated Branches: refs/heads/branch-1.6 99dc1335e -> ab2a124c8 [SPARK-12065] Upgrade Tachyon from 0.8.1 to 0.8.2 This commit upgrades the Tachyon dependency from 0.8.1 to 0.8.2. Author: Josh Rosen Closes #10054 from JoshRosen/upgrade-to-tachyon-0.8.2. (cherry picked from commit 34e7093c1131162b3aa05b65a19a633a0b5b633e) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab2a124c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab2a124c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab2a124c Branch: refs/heads/branch-1.6 Commit: ab2a124c8eca6823ee016c9ecfbdbf4918fbcdd6 Parents: 99dc133 Author: Josh Rosen Authored: Tue Dec 1 11:49:20 2015 -0800 Committer: Josh Rosen Committed: Tue Dec 1 11:50:00 2015 -0800 -- core/pom.xml | 2 +- make-distribution.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ab2a124c/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 37e3f16..61744bb 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -270,7 +270,7 @@ org.tachyonproject tachyon-client - 0.8.1 + 0.8.2 org.apache.hadoop http://git-wip-us.apache.org/repos/asf/spark/blob/ab2a124c/make-distribution.sh -- diff --git a/make-distribution.sh b/make-distribution.sh index d7d27e2..c949e94 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -33,7 +33,7 @@ SPARK_HOME="$(cd "`dirname "$0"`"; pwd)" DISTDIR="$SPARK_HOME/dist" SPARK_TACHYON=false -TACHYON_VERSION="0.8.1" +TACHYON_VERSION="0.8.2" TACHYON_TGZ="tachyon-${TACHYON_VERSION}-bin.tar.gz" TACHYON_URL="http://tachyon-project.org/downloads/files/${TACHYON_VERSION}/${TACHYON_TGZ}"; - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12065] Upgrade Tachyon from 0.8.1 to 0.8.2
Repository: spark Updated Branches: refs/heads/master 6a8cf80cc -> 34e7093c1 [SPARK-12065] Upgrade Tachyon from 0.8.1 to 0.8.2 This commit upgrades the Tachyon dependency from 0.8.1 to 0.8.2. Author: Josh Rosen Closes #10054 from JoshRosen/upgrade-to-tachyon-0.8.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34e7093c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34e7093c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34e7093c Branch: refs/heads/master Commit: 34e7093c1131162b3aa05b65a19a633a0b5b633e Parents: 6a8cf80 Author: Josh Rosen Authored: Tue Dec 1 11:49:20 2015 -0800 Committer: Josh Rosen Committed: Tue Dec 1 11:49:20 2015 -0800 -- core/pom.xml | 2 +- make-distribution.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/34e7093c/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 37e3f16..61744bb 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -270,7 +270,7 @@ org.tachyonproject tachyon-client - 0.8.1 + 0.8.2 org.apache.hadoop http://git-wip-us.apache.org/repos/asf/spark/blob/34e7093c/make-distribution.sh -- diff --git a/make-distribution.sh b/make-distribution.sh index 7b417fe..e64ceb8 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -33,7 +33,7 @@ SPARK_HOME="$(cd "`dirname "$0"`"; pwd)" DISTDIR="$SPARK_HOME/dist" SPARK_TACHYON=false -TACHYON_VERSION="0.8.1" +TACHYON_VERSION="0.8.2" TACHYON_TGZ="tachyon-${TACHYON_VERSION}-bin.tar.gz" TACHYON_URL="http://tachyon-project.org/downloads/files/${TACHYON_VERSION}/${TACHYON_TGZ}"; - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11821] Propagate Kerberos keytab for all environments
Repository: spark Updated Branches: refs/heads/branch-1.6 843a31afb -> 99dc1335e [SPARK-11821] Propagate Kerberos keytab for all environments andrewor14 the same PR as in branch 1.5 harishreedharan Author: woj-i Closes #9859 from woj-i/master. (cherry picked from commit 6a8cf80cc8ef435ec46138fa57325bda5d68f3ce) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99dc1335 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99dc1335 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99dc1335 Branch: refs/heads/branch-1.6 Commit: 99dc1335e2f635a067f9fa1e83a35bf9593bfc24 Parents: 843a31a Author: woj-i Authored: Tue Dec 1 11:05:45 2015 -0800 Committer: Marcelo Vanzin Committed: Tue Dec 1 11:06:08 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 docs/running-on-yarn.md | 4 ++-- docs/sql-programming-guide.md | 7 --- 3 files changed, 10 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/99dc1335/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 2e912b5..52d3ab3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -545,6 +545,10 @@ object SparkSubmit { if (args.isPython) { sysProps.put("spark.yarn.isPython", "true") } +} + +// assure a keytab is available from any place in a JVM +if (clusterManager == YARN || clusterManager == LOCAL) { if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when principal is specified") if (!new File(args.keytab).exists()) { http://git-wip-us.apache.org/repos/asf/spark/blob/99dc1335/docs/running-on-yarn.md -- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 925a1e0..06413f8 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -358,14 +358,14 @@ If you need a reference to the proper location to put log files in the YARN so t The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the YARN Application Master via the Secure Distributed Cache, - for renewing the login tickets and the delegation tokens periodically. + for renewing the login tickets and the delegation tokens periodically. (Works also with the "local" master) spark.yarn.principal (none) - Principal to be used to login to KDC, while running on secure HDFS. + Principal to be used to login to KDC, while running on secure HDFS. (Works also with the "local" master) http://git-wip-us.apache.org/repos/asf/spark/blob/99dc1335/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index d7b205c..7b1d97b 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1614,7 +1614,8 @@ This command builds a new assembly jar that includes Hive. Note that this Hive a on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive. -Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. Please note when running +Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` (for security configuration), + `hdfs-site.xml` (for HDFS configuration) file in `conf/`. Please note when running the query on a YARN cluster (`cluster` mode), the `datanucleus` jars under the `lib_managed/jars` directory and `hive-site.xml` under `conf/` directory need to be available on the driver and all executors launched by the YARN cluster. The convenient way to do this is adding them through the `--jars` option and `--file` option of the @@ -2028,7 +2029,7 @@ Beeline will ask you for a username and password. In non-secure mode, simply ent your machine and a blank password. For secure mode, please follow the instructions given in the [beeline documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients). -Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. +Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` and `hdfs-site.xml` files in `conf/`. You may also use t
spark git commit: [SPARK-11821] Propagate Kerberos keytab for all environments
Repository: spark Updated Branches: refs/heads/master 0a7bca2da -> 6a8cf80cc [SPARK-11821] Propagate Kerberos keytab for all environments andrewor14 the same PR as in branch 1.5 harishreedharan Author: woj-i Closes #9859 from woj-i/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a8cf80c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a8cf80c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a8cf80c Branch: refs/heads/master Commit: 6a8cf80cc8ef435ec46138fa57325bda5d68f3ce Parents: 0a7bca2 Author: woj-i Authored: Tue Dec 1 11:05:45 2015 -0800 Committer: Marcelo Vanzin Committed: Tue Dec 1 11:05:45 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 docs/running-on-yarn.md | 4 ++-- docs/sql-programming-guide.md | 7 --- 3 files changed, 10 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a8cf80c/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 2e912b5..52d3ab3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -545,6 +545,10 @@ object SparkSubmit { if (args.isPython) { sysProps.put("spark.yarn.isPython", "true") } +} + +// assure a keytab is available from any place in a JVM +if (clusterManager == YARN || clusterManager == LOCAL) { if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when principal is specified") if (!new File(args.keytab).exists()) { http://git-wip-us.apache.org/repos/asf/spark/blob/6a8cf80c/docs/running-on-yarn.md -- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 925a1e0..06413f8 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -358,14 +358,14 @@ If you need a reference to the proper location to put log files in the YARN so t The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the YARN Application Master via the Secure Distributed Cache, - for renewing the login tickets and the delegation tokens periodically. + for renewing the login tickets and the delegation tokens periodically. (Works also with the "local" master) spark.yarn.principal (none) - Principal to be used to login to KDC, while running on secure HDFS. + Principal to be used to login to KDC, while running on secure HDFS. (Works also with the "local" master) http://git-wip-us.apache.org/repos/asf/spark/blob/6a8cf80c/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index d7b205c..7b1d97b 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1614,7 +1614,8 @@ This command builds a new assembly jar that includes Hive. Note that this Hive a on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive. -Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. Please note when running +Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` (for security configuration), + `hdfs-site.xml` (for HDFS configuration) file in `conf/`. Please note when running the query on a YARN cluster (`cluster` mode), the `datanucleus` jars under the `lib_managed/jars` directory and `hive-site.xml` under `conf/` directory need to be available on the driver and all executors launched by the YARN cluster. The convenient way to do this is adding them through the `--jars` option and `--file` option of the @@ -2028,7 +2029,7 @@ Beeline will ask you for a username and password. In non-secure mode, simply ent your machine and a blank password. For secure mode, please follow the instructions given in the [beeline documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients). -Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. +Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` and `hdfs-site.xml` files in `conf/`. You may also use the beeline script that comes with Hive. @@ -2053,7 +2054,7 @@ To start the Spark SQL CLI, run the following
spark git commit: [SPARK-12046][DOC] Fixes various ScalaDoc/JavaDoc issues
Repository: spark Updated Branches: refs/heads/branch-1.6 40769b48c -> 843a31afb [SPARK-12046][DOC] Fixes various ScalaDoc/JavaDoc issues This PR backports PR #10039 to master Author: Cheng Lian Closes #10063 from liancheng/spark-12046.doc-fix.master. (cherry picked from commit 69dbe6b40df35d488d4ee343098ac70d00bbdafb) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/843a31af Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/843a31af Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/843a31af Branch: refs/heads/branch-1.6 Commit: 843a31afbdeea66449750f0ba8f676ef31d00726 Parents: 40769b4 Author: Cheng Lian Authored: Tue Dec 1 10:21:31 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 10:50:28 2015 -0800 -- .../spark/api/java/function/Function4.java | 2 +- .../spark/api/java/function/VoidFunction.java | 2 +- .../spark/api/java/function/VoidFunction2.java | 2 +- .../org/apache/spark/api/java/JavaPairRDD.scala | 16 +++ .../scala/org/apache/spark/memory/package.scala | 14 +++--- .../org/apache/spark/rdd/CoGroupedRDD.scala | 2 +- .../org/apache/spark/rdd/PairRDDFunctions.scala | 6 +-- .../org/apache/spark/rdd/ShuffledRDD.scala | 2 +- .../scala/org/apache/spark/scheduler/Task.scala | 5 ++- .../serializer/SerializationDebugger.scala | 13 +++--- .../scala/org/apache/spark/util/Vector.scala| 1 + .../spark/util/collection/ExternalSorter.scala | 30 ++--- .../WritablePartitionedPairCollection.scala | 7 +-- .../streaming/kinesis/KinesisReceiver.scala | 23 +- .../spark/streaming/kinesis/KinesisUtils.scala | 13 +++--- .../mllib/optimization/GradientDescent.scala| 12 +++--- project/SparkBuild.scala| 2 + .../scala/org/apache/spark/sql/Column.scala | 11 ++--- .../spark/streaming/StreamingContext.scala | 11 ++--- .../streaming/dstream/FileInputDStream.scala| 19 + .../streaming/receiver/BlockGenerator.scala | 22 +- .../scheduler/ReceiverSchedulingPolicy.scala| 45 ++-- .../streaming/util/FileBasedWriteAheadLog.scala | 7 +-- .../spark/streaming/util/RecurringTimer.scala | 8 ++-- .../org/apache/spark/deploy/yarn/Client.scala | 10 ++--- 25 files changed, 152 insertions(+), 133 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/843a31af/core/src/main/java/org/apache/spark/api/java/function/Function4.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function4.java b/core/src/main/java/org/apache/spark/api/java/function/Function4.java index fd727d6..9c35a22 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function4.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function4.java @@ -23,5 +23,5 @@ import java.io.Serializable; * A four-argument function that takes arguments of type T1, T2, T3 and T4 and returns an R. */ public interface Function4 extends Serializable { - public R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception; + R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/843a31af/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java index 2a10435..f30d42e 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java @@ -23,5 +23,5 @@ import java.io.Serializable; * A function with no return value. */ public interface VoidFunction extends Serializable { - public void call(T t) throws Exception; + void call(T t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/843a31af/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java index 6c576ab..da9ae1c 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java @@ -23,5 +23,5 @@ import java.io.Serializable; * A two-argument function that takes arguments of type T1 and T2 with no return value. */ public interface VoidFunction2 extends Serializable { - public void call(T1 v1, T2 v2) throws Exception; + void call(T1 v1, T
spark git commit: [SPARK-11905][SQL] Support Persist/Cache and Unpersist in Dataset APIs
Repository: spark Updated Branches: refs/heads/branch-1.6 88bbce008 -> 40769b48c [SPARK-11905][SQL] Support Persist/Cache and Unpersist in Dataset APIs Persist and Unpersist exist in both RDD and Dataframe APIs. I think they are still very critical in Dataset APIs. Not sure if my understanding is correct? If so, could you help me check if the implementation is acceptable? Please provide your opinions. marmbrus rxin cloud-fan Thank you very much! Author: gatorsmile Author: xiaoli Author: Xiao Li Closes #9889 from gatorsmile/persistDS. (cherry picked from commit 0a7bca2da04aefff16f2513ec27a92e69ceb77f6) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40769b48 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40769b48 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40769b48 Branch: refs/heads/branch-1.6 Commit: 40769b48cd001b7ff8c301628dc1442e3dd946cd Parents: 88bbce0 Author: gatorsmile Authored: Tue Dec 1 10:38:59 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 10:39:14 2015 -0800 -- .../scala/org/apache/spark/sql/DataFrame.scala | 9 +++ .../scala/org/apache/spark/sql/Dataset.scala| 50 +++- .../scala/org/apache/spark/sql/SQLContext.scala | 9 +++ .../spark/sql/execution/CacheManager.scala | 27 +++ .../apache/spark/sql/DatasetCacheSuite.scala| 80 .../scala/org/apache/spark/sql/QueryTest.scala | 5 +- 6 files changed, 162 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/40769b48/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6197f10..eb87003 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1584,6 +1584,7 @@ class DataFrame private[sql]( def distinct(): DataFrame = dropDuplicates() /** + * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`). * @group basic * @since 1.3.0 */ @@ -1593,12 +1594,17 @@ class DataFrame private[sql]( } /** + * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`). * @group basic * @since 1.3.0 */ def cache(): this.type = persist() /** + * Persist this [[DataFrame]] with the given storage level. + * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, + * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, + * `MEMORY_AND_DISK_2`, etc. * @group basic * @since 1.3.0 */ @@ -1608,6 +1614,8 @@ class DataFrame private[sql]( } /** + * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk. + * @param blocking Whether to block until all blocks are deleted. * @group basic * @since 1.3.0 */ @@ -1617,6 +1625,7 @@ class DataFrame private[sql]( } /** + * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk. * @group basic * @since 1.3.0 */ http://git-wip-us.apache.org/repos/asf/spark/blob/40769b48/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c357f88..d6bb1d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{Queryable, QueryExecution} import org.apache.spark.sql.types.StructType +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils /** @@ -565,7 +566,7 @@ class Dataset[T] private[sql]( * combined. * * Note that, this function is not a typical set union operation, in that it does not eliminate - * duplicate items. As such, it is analagous to `UNION ALL` in SQL. + * duplicate items. As such, it is analogous to `UNION ALL` in SQL. * @since 1.6.0 */ def union(other: Dataset[T]): Dataset[T] = withPlan[T](other)(Union) @@ -618,7 +619,6 @@ class Dataset[T] private[sql]( case _ => Alias(CreateStruct(rightOutput), "_2")() } - implicit val tuple2Encoder: Encoder[(T, U)] = ExpressionEncoder.tuple(this.unresolvedTEncoder, other.unresolvedTEncoder) withPlan[(T
spark git commit: [SPARK-11905][SQL] Support Persist/Cache and Unpersist in Dataset APIs
Repository: spark Updated Branches: refs/heads/master fd95eeaf4 -> 0a7bca2da [SPARK-11905][SQL] Support Persist/Cache and Unpersist in Dataset APIs Persist and Unpersist exist in both RDD and Dataframe APIs. I think they are still very critical in Dataset APIs. Not sure if my understanding is correct? If so, could you help me check if the implementation is acceptable? Please provide your opinions. marmbrus rxin cloud-fan Thank you very much! Author: gatorsmile Author: xiaoli Author: Xiao Li Closes #9889 from gatorsmile/persistDS. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a7bca2d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a7bca2d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a7bca2d Branch: refs/heads/master Commit: 0a7bca2da04aefff16f2513ec27a92e69ceb77f6 Parents: fd95eea Author: gatorsmile Authored: Tue Dec 1 10:38:59 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 10:38:59 2015 -0800 -- .../scala/org/apache/spark/sql/DataFrame.scala | 9 +++ .../scala/org/apache/spark/sql/Dataset.scala| 50 +++- .../scala/org/apache/spark/sql/SQLContext.scala | 9 +++ .../spark/sql/execution/CacheManager.scala | 27 +++ .../apache/spark/sql/DatasetCacheSuite.scala| 80 .../scala/org/apache/spark/sql/QueryTest.scala | 5 +- 6 files changed, 162 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a7bca2d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6197f10..eb87003 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1584,6 +1584,7 @@ class DataFrame private[sql]( def distinct(): DataFrame = dropDuplicates() /** + * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`). * @group basic * @since 1.3.0 */ @@ -1593,12 +1594,17 @@ class DataFrame private[sql]( } /** + * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`). * @group basic * @since 1.3.0 */ def cache(): this.type = persist() /** + * Persist this [[DataFrame]] with the given storage level. + * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, + * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, + * `MEMORY_AND_DISK_2`, etc. * @group basic * @since 1.3.0 */ @@ -1608,6 +1614,8 @@ class DataFrame private[sql]( } /** + * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk. + * @param blocking Whether to block until all blocks are deleted. * @group basic * @since 1.3.0 */ @@ -1617,6 +1625,7 @@ class DataFrame private[sql]( } /** + * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk. * @group basic * @since 1.3.0 */ http://git-wip-us.apache.org/repos/asf/spark/blob/0a7bca2d/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c357f88..d6bb1d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{Queryable, QueryExecution} import org.apache.spark.sql.types.StructType +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils /** @@ -565,7 +566,7 @@ class Dataset[T] private[sql]( * combined. * * Note that, this function is not a typical set union operation, in that it does not eliminate - * duplicate items. As such, it is analagous to `UNION ALL` in SQL. + * duplicate items. As such, it is analogous to `UNION ALL` in SQL. * @since 1.6.0 */ def union(other: Dataset[T]): Dataset[T] = withPlan[T](other)(Union) @@ -618,7 +619,6 @@ class Dataset[T] private[sql]( case _ => Alias(CreateStruct(rightOutput), "_2")() } - implicit val tuple2Encoder: Encoder[(T, U)] = ExpressionEncoder.tuple(this.unresolvedTEncoder, other.unresolvedTEncoder) withPlan[(T, U)](other) { (left, right) => @@ -697,11 +697,55 @@ class Dataset[T] private[sql]( */ def takeAsList(n
spark git commit: [SPARK-11954][SQL] Encoder for JavaBeans
Repository: spark Updated Branches: refs/heads/branch-1.6 74a230676 -> 88bbce008 [SPARK-11954][SQL] Encoder for JavaBeans create java version of `constructorFor` and `extractorFor` in `JavaTypeInference` Author: Wenchen Fan This patch had conflicts when merged, resolved by Committer: Michael Armbrust Closes #9937 from cloud-fan/pojo. (cherry picked from commit fd95eeaf491809c6bb0f83d46b37b5e2eebbcbca) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88bbce00 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88bbce00 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88bbce00 Branch: refs/heads/branch-1.6 Commit: 88bbce00813acf23eef411ac354c35995ddf9e77 Parents: 74a2306 Author: Wenchen Fan Authored: Tue Dec 1 10:35:12 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 10:35:26 2015 -0800 -- .../scala/org/apache/spark/sql/Encoder.scala| 18 ++ .../spark/sql/catalyst/JavaTypeInference.scala | 313 ++- .../catalyst/encoders/ExpressionEncoder.scala | 21 +- .../sql/catalyst/expressions/objects.scala | 42 ++- .../spark/sql/catalyst/trees/TreeNode.scala | 27 +- .../sql/catalyst/util/ArrayBasedMapData.scala | 5 + .../sql/catalyst/util/GenericArrayData.scala| 3 + .../sql/catalyst/trees/TreeNodeSuite.scala | 25 ++ .../org/apache/spark/sql/JavaDatasetSuite.java | 174 ++- 9 files changed, 608 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88bbce00/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index 03aa25e..c40061a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -98,6 +98,24 @@ object Encoders { def STRING: Encoder[java.lang.String] = ExpressionEncoder() /** + * Creates an encoder for Java Bean of type T. + * + * T must be publicly accessible. + * + * supported types for java bean field: + * - primitive types: boolean, int, double, etc. + * - boxed types: Boolean, Integer, Double, etc. + * - String + * - java.math.BigDecimal + * - time related: java.sql.Date, java.sql.Timestamp + * - collection types: only array and java.util.List currently, map support is in progress + * - nested java bean. + * + * @since 1.6.0 + */ + def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass) + + /** * (Scala-specific) Creates an encoder that serializes objects of type T using Kryo. * This encoder maps T into a single byte array (binary) field. * http://git-wip-us.apache.org/repos/asf/spark/blob/88bbce00/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 7d4cfbe..c8ee87e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -17,14 +17,20 @@ package org.apache.spark.sql.catalyst -import java.beans.Introspector +import java.beans.{PropertyDescriptor, Introspector} import java.lang.{Iterable => JIterable} -import java.util.{Iterator => JIterator, Map => JMap} +import java.util.{Iterator => JIterator, Map => JMap, List => JList} import scala.language.existentials import com.google.common.reflect.TypeToken + import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData, DateTimeUtils} +import org.apache.spark.unsafe.types.UTF8String + /** * Type-inference utilities for POJOs and Java collections. @@ -33,13 +39,14 @@ object JavaTypeInference { private val iterableType = TypeToken.of(classOf[JIterable[_]]) private val mapType = TypeToken.of(classOf[JMap[_, _]]) + private val listType = TypeToken.of(classOf[JList[_]]) private val iteratorReturnType = classOf[JIterable[_]].getMethod("iterator").getGenericReturnType private val nextReturnType = classOf[JIterator[_]].getMethod("next").getGenericReturnType private val keySetReturnType = classOf[JMap[_, _]].getMethod("keySet").getGenericReturnType private
spark git commit: [SPARK-11954][SQL] Encoder for JavaBeans
Repository: spark Updated Branches: refs/heads/master 9df24624a -> fd95eeaf4 [SPARK-11954][SQL] Encoder for JavaBeans create java version of `constructorFor` and `extractorFor` in `JavaTypeInference` Author: Wenchen Fan This patch had conflicts when merged, resolved by Committer: Michael Armbrust Closes #9937 from cloud-fan/pojo. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd95eeaf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd95eeaf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd95eeaf Branch: refs/heads/master Commit: fd95eeaf491809c6bb0f83d46b37b5e2eebbcbca Parents: 9df2462 Author: Wenchen Fan Authored: Tue Dec 1 10:35:12 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 10:35:12 2015 -0800 -- .../scala/org/apache/spark/sql/Encoder.scala| 18 ++ .../spark/sql/catalyst/JavaTypeInference.scala | 313 ++- .../catalyst/encoders/ExpressionEncoder.scala | 21 +- .../sql/catalyst/expressions/objects.scala | 42 ++- .../spark/sql/catalyst/trees/TreeNode.scala | 27 +- .../sql/catalyst/util/ArrayBasedMapData.scala | 5 + .../sql/catalyst/util/GenericArrayData.scala| 3 + .../sql/catalyst/trees/TreeNodeSuite.scala | 25 ++ .../org/apache/spark/sql/JavaDatasetSuite.java | 174 ++- 9 files changed, 608 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fd95eeaf/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index 03aa25e..c40061a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -98,6 +98,24 @@ object Encoders { def STRING: Encoder[java.lang.String] = ExpressionEncoder() /** + * Creates an encoder for Java Bean of type T. + * + * T must be publicly accessible. + * + * supported types for java bean field: + * - primitive types: boolean, int, double, etc. + * - boxed types: Boolean, Integer, Double, etc. + * - String + * - java.math.BigDecimal + * - time related: java.sql.Date, java.sql.Timestamp + * - collection types: only array and java.util.List currently, map support is in progress + * - nested java bean. + * + * @since 1.6.0 + */ + def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass) + + /** * (Scala-specific) Creates an encoder that serializes objects of type T using Kryo. * This encoder maps T into a single byte array (binary) field. * http://git-wip-us.apache.org/repos/asf/spark/blob/fd95eeaf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 7d4cfbe..c8ee87e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -17,14 +17,20 @@ package org.apache.spark.sql.catalyst -import java.beans.Introspector +import java.beans.{PropertyDescriptor, Introspector} import java.lang.{Iterable => JIterable} -import java.util.{Iterator => JIterator, Map => JMap} +import java.util.{Iterator => JIterator, Map => JMap, List => JList} import scala.language.existentials import com.google.common.reflect.TypeToken + import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData, DateTimeUtils} +import org.apache.spark.unsafe.types.UTF8String + /** * Type-inference utilities for POJOs and Java collections. @@ -33,13 +39,14 @@ object JavaTypeInference { private val iterableType = TypeToken.of(classOf[JIterable[_]]) private val mapType = TypeToken.of(classOf[JMap[_, _]]) + private val listType = TypeToken.of(classOf[JList[_]]) private val iteratorReturnType = classOf[JIterable[_]].getMethod("iterator").getGenericReturnType private val nextReturnType = classOf[JIterator[_]].getMethod("next").getGenericReturnType private val keySetReturnType = classOf[JMap[_, _]].getMethod("keySet").getGenericReturnType private val valuesReturnType = classOf[JMap[_, _]].getMethod("values").getGenericReturnType /** - * Infers the
spark git commit: [SPARK-11856][SQL] add type cast if the real type is different but compatible with encoder schema
Repository: spark Updated Branches: refs/heads/master 8ddc55f1d -> 9df24624a [SPARK-11856][SQL] add type cast if the real type is different but compatible with encoder schema When we build the `fromRowExpression` for an encoder, we set up a lot of "unresolved" stuff and lost the required data type, which may lead to runtime error if the real type doesn't match the encoder's schema. For example, we build an encoder for `case class Data(a: Int, b: String)` and the real type is `[a: int, b: long]`, then we will hit runtime error and say that we can't construct class `Data` with int and long, because we lost the information that `b` should be a string. Author: Wenchen Fan Closes #9840 from cloud-fan/err-msg. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9df24624 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9df24624 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9df24624 Branch: refs/heads/master Commit: 9df24624afedd993a39ab46c8211ae153aedef1a Parents: 8ddc55f Author: Wenchen Fan Authored: Tue Dec 1 10:24:53 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 10:24:53 2015 -0800 -- .../spark/sql/catalyst/ScalaReflection.scala| 93 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 40 + .../catalyst/analysis/HiveTypeCoercion.scala| 2 +- .../catalyst/encoders/ExpressionEncoder.scala | 4 +- .../spark/sql/catalyst/expressions/Cast.scala | 9 + .../expressions/complexTypeCreator.scala| 2 +- .../apache/spark/sql/types/DecimalType.scala| 12 ++ .../encoders/EncoderResolutionSuite.scala | 180 +++ .../spark/sql/DatasetAggregatorSuite.scala | 4 +- .../org/apache/spark/sql/DatasetSuite.scala | 21 ++- 10 files changed, 335 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9df24624/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index d133ad3..9b6b5b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -18,9 +18,8 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.analysis.{UnresolvedExtractValue, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData, ArrayData, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData, DateTimeUtils} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -117,31 +116,75 @@ object ScalaReflection extends ScalaReflection { * from ordinal 0 (since there are no names to map to). The actual location can be moved by * calling resolve/bind with a new schema. */ - def constructorFor[T : TypeTag]: Expression = constructorFor(localTypeOf[T], None) + def constructorFor[T : TypeTag]: Expression = { +val tpe = localTypeOf[T] +val clsName = getClassNameFromType(tpe) +val walkedTypePath = s"""- root class: "${clsName} :: Nil +constructorFor(tpe, None, walkedTypePath) + } private def constructorFor( tpe: `Type`, - path: Option[Expression]): Expression = ScalaReflectionLock.synchronized { + path: Option[Expression], + walkedTypePath: Seq[String]): Expression = ScalaReflectionLock.synchronized { /** Returns the current path with a sub-field extracted. */ -def addToPath(part: String): Expression = path - .map(p => UnresolvedExtractValue(p, expressions.Literal(part))) - .getOrElse(UnresolvedAttribute(part)) +def addToPath(part: String, dataType: DataType, walkedTypePath: Seq[String]): Expression = { + val newPath = path +.map(p => UnresolvedExtractValue(p, expressions.Literal(part))) +.getOrElse(UnresolvedAttribute(part)) + upCastToExpectedType(newPath, dataType, walkedTypePath) +} /** Returns the current path with a field at ordinal extracted. */ -def addToPathOrdinal(ordinal: Int, dataType: DataType): Expression = path - .map(p => GetStructField(p, ordinal)) - .getOrElse(BoundReference(ordinal, dataType, false)) +def addToPathOrdinal( +ordinal: Int, +dataType: DataType, +walkedTypePath: Seq[String]): Expression = { + val newPath = path +.map(p
spark git commit: [SPARK-11856][SQL] add type cast if the real type is different but compatible with encoder schema
Repository: spark Updated Branches: refs/heads/branch-1.6 6e3e3c648 -> 74a230676 [SPARK-11856][SQL] add type cast if the real type is different but compatible with encoder schema When we build the `fromRowExpression` for an encoder, we set up a lot of "unresolved" stuff and lost the required data type, which may lead to runtime error if the real type doesn't match the encoder's schema. For example, we build an encoder for `case class Data(a: Int, b: String)` and the real type is `[a: int, b: long]`, then we will hit runtime error and say that we can't construct class `Data` with int and long, because we lost the information that `b` should be a string. Author: Wenchen Fan Closes #9840 from cloud-fan/err-msg. (cherry picked from commit 9df24624afedd993a39ab46c8211ae153aedef1a) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74a23067 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74a23067 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74a23067 Branch: refs/heads/branch-1.6 Commit: 74a2306763161fc04c9d3e7de186a6b31617faf4 Parents: 6e3e3c6 Author: Wenchen Fan Authored: Tue Dec 1 10:24:53 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 10:25:11 2015 -0800 -- .../spark/sql/catalyst/ScalaReflection.scala| 93 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 40 + .../catalyst/analysis/HiveTypeCoercion.scala| 2 +- .../catalyst/encoders/ExpressionEncoder.scala | 4 +- .../spark/sql/catalyst/expressions/Cast.scala | 9 + .../expressions/complexTypeCreator.scala| 2 +- .../apache/spark/sql/types/DecimalType.scala| 12 ++ .../encoders/EncoderResolutionSuite.scala | 180 +++ .../spark/sql/DatasetAggregatorSuite.scala | 4 +- .../org/apache/spark/sql/DatasetSuite.scala | 21 ++- 10 files changed, 335 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/74a23067/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index d133ad3..9b6b5b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -18,9 +18,8 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.analysis.{UnresolvedExtractValue, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData, ArrayData, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData, DateTimeUtils} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -117,31 +116,75 @@ object ScalaReflection extends ScalaReflection { * from ordinal 0 (since there are no names to map to). The actual location can be moved by * calling resolve/bind with a new schema. */ - def constructorFor[T : TypeTag]: Expression = constructorFor(localTypeOf[T], None) + def constructorFor[T : TypeTag]: Expression = { +val tpe = localTypeOf[T] +val clsName = getClassNameFromType(tpe) +val walkedTypePath = s"""- root class: "${clsName} :: Nil +constructorFor(tpe, None, walkedTypePath) + } private def constructorFor( tpe: `Type`, - path: Option[Expression]): Expression = ScalaReflectionLock.synchronized { + path: Option[Expression], + walkedTypePath: Seq[String]): Expression = ScalaReflectionLock.synchronized { /** Returns the current path with a sub-field extracted. */ -def addToPath(part: String): Expression = path - .map(p => UnresolvedExtractValue(p, expressions.Literal(part))) - .getOrElse(UnresolvedAttribute(part)) +def addToPath(part: String, dataType: DataType, walkedTypePath: Seq[String]): Expression = { + val newPath = path +.map(p => UnresolvedExtractValue(p, expressions.Literal(part))) +.getOrElse(UnresolvedAttribute(part)) + upCastToExpectedType(newPath, dataType, walkedTypePath) +} /** Returns the current path with a field at ordinal extracted. */ -def addToPathOrdinal(ordinal: Int, dataType: DataType): Expression = path - .map(p => GetStructField(p, ordinal)) - .getOrElse(BoundReference(ordinal, dataType, false)) +def addToPathOrdinal( +ordinal: Int, +data
spark git commit: [SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail
Repository: spark Updated Branches: refs/heads/branch-1.6 9b99b2b46 -> 6e3e3c648 [SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail The reason is that, for a single culumn `RowEncoder`(or a single field product encoder), when we use it as the encoder for grouping key, we should also combine the grouping attributes, although there is only one grouping attribute. Author: Wenchen Fan Closes #10059 from cloud-fan/bug. (cherry picked from commit 8ddc55f1d5823ca135510b2ea776e889e481) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e3e3c64 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e3e3c64 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e3e3c64 Branch: refs/heads/branch-1.6 Commit: 6e3e3c648c4f74d9c1aabe767dbadfe47bd7e658 Parents: 9b99b2b Author: Wenchen Fan Authored: Tue Dec 1 10:22:55 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 10:23:17 2015 -0800 -- .../scala/org/apache/spark/sql/Dataset.scala | 2 +- .../org/apache/spark/sql/GroupedDataset.scala| 7 --- .../org/apache/spark/sql/DatasetSuite.scala | 19 +++ .../scala/org/apache/spark/sql/QueryTest.scala | 6 +++--- 4 files changed, 27 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e3e3c64/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index da46001..c357f88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -70,7 +70,7 @@ class Dataset[T] private[sql]( * implicit so that we can use it when constructing new [[Dataset]] objects that have the same * object type (that will be possibly resolved to a different schema). */ - private implicit val unresolvedTEncoder: ExpressionEncoder[T] = encoderFor(tEncoder) + private[sql] implicit val unresolvedTEncoder: ExpressionEncoder[T] = encoderFor(tEncoder) /** The encoder for this [[Dataset]] that has been resolved to its output schema. */ private[sql] val resolvedTEncoder: ExpressionEncoder[T] = http://git-wip-us.apache.org/repos/asf/spark/blob/6e3e3c64/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala index a10a893..4bf0b25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala @@ -228,10 +228,11 @@ class GroupedDataset[K, V] private[sql]( val namedColumns = columns.map( _.withInputType(resolvedVEncoder, dataAttributes).named) -val keyColumn = if (groupingAttributes.length > 1) { - Alias(CreateStruct(groupingAttributes), "key")() -} else { +val keyColumn = if (resolvedKEncoder.flat) { + assert(groupingAttributes.length == 1) groupingAttributes.head +} else { + Alias(CreateStruct(groupingAttributes), "key")() } val aggregate = Aggregate(groupingAttributes, keyColumn +: namedColumns, logicalPlan) val execution = new QueryExecution(sqlContext, aggregate) http://git-wip-us.apache.org/repos/asf/spark/blob/6e3e3c64/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 7d53918..a2c8d20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -272,6 +272,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext { 3 -> "abcxyz", 5 -> "hello") } + test("groupBy single field class, count") { +val ds = Seq("abc", "xyz", "hello").toDS() +val count = ds.groupBy(s => Tuple1(s.length)).count() + +checkAnswer( + count, + (Tuple1(3), 2L), (Tuple1(5), 1L) +) + } + test("groupBy columns, map") { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupBy($"_1") @@ -282,6 +292,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { ("a", 30), ("b", 3), ("c", 1)) } + test("groupBy columns, count") { +val ds = Seq("a" -> 1, "b" -> 1, "a" -> 2).toDS() +val count = ds.groupBy($
spark git commit: [SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail
Repository: spark Updated Branches: refs/heads/master 69dbe6b40 -> 8ddc55f1d [SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail The reason is that, for a single culumn `RowEncoder`(or a single field product encoder), when we use it as the encoder for grouping key, we should also combine the grouping attributes, although there is only one grouping attribute. Author: Wenchen Fan Closes #10059 from cloud-fan/bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ddc55f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ddc55f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ddc55f1 Branch: refs/heads/master Commit: 8ddc55f1d5823ca135510b2ea776e889e481 Parents: 69dbe6b Author: Wenchen Fan Authored: Tue Dec 1 10:22:55 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 10:22:55 2015 -0800 -- .../scala/org/apache/spark/sql/Dataset.scala | 2 +- .../org/apache/spark/sql/GroupedDataset.scala| 7 --- .../org/apache/spark/sql/DatasetSuite.scala | 19 +++ .../scala/org/apache/spark/sql/QueryTest.scala | 6 +++--- 4 files changed, 27 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8ddc55f1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index da46001..c357f88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -70,7 +70,7 @@ class Dataset[T] private[sql]( * implicit so that we can use it when constructing new [[Dataset]] objects that have the same * object type (that will be possibly resolved to a different schema). */ - private implicit val unresolvedTEncoder: ExpressionEncoder[T] = encoderFor(tEncoder) + private[sql] implicit val unresolvedTEncoder: ExpressionEncoder[T] = encoderFor(tEncoder) /** The encoder for this [[Dataset]] that has been resolved to its output schema. */ private[sql] val resolvedTEncoder: ExpressionEncoder[T] = http://git-wip-us.apache.org/repos/asf/spark/blob/8ddc55f1/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala index a10a893..4bf0b25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala @@ -228,10 +228,11 @@ class GroupedDataset[K, V] private[sql]( val namedColumns = columns.map( _.withInputType(resolvedVEncoder, dataAttributes).named) -val keyColumn = if (groupingAttributes.length > 1) { - Alias(CreateStruct(groupingAttributes), "key")() -} else { +val keyColumn = if (resolvedKEncoder.flat) { + assert(groupingAttributes.length == 1) groupingAttributes.head +} else { + Alias(CreateStruct(groupingAttributes), "key")() } val aggregate = Aggregate(groupingAttributes, keyColumn +: namedColumns, logicalPlan) val execution = new QueryExecution(sqlContext, aggregate) http://git-wip-us.apache.org/repos/asf/spark/blob/8ddc55f1/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 7d53918..a2c8d20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -272,6 +272,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext { 3 -> "abcxyz", 5 -> "hello") } + test("groupBy single field class, count") { +val ds = Seq("abc", "xyz", "hello").toDS() +val count = ds.groupBy(s => Tuple1(s.length)).count() + +checkAnswer( + count, + (Tuple1(3), 2L), (Tuple1(5), 1L) +) + } + test("groupBy columns, map") { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupBy($"_1") @@ -282,6 +292,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { ("a", 30), ("b", 3), ("c", 1)) } + test("groupBy columns, count") { +val ds = Seq("a" -> 1, "b" -> 1, "a" -> 2).toDS() +val count = ds.groupBy($"_1").count() + +checkAnswer( + count, + (Row("a"), 2L), (Row("b"), 1L)) + } + test("groupBy
spark git commit: [SPARK-12046][DOC] Fixes various ScalaDoc/JavaDoc issues
Repository: spark Updated Branches: refs/heads/master 140116657 -> 69dbe6b40 [SPARK-12046][DOC] Fixes various ScalaDoc/JavaDoc issues This PR backports PR #10039 to master Author: Cheng Lian Closes #10063 from liancheng/spark-12046.doc-fix.master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69dbe6b4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69dbe6b4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69dbe6b4 Branch: refs/heads/master Commit: 69dbe6b40df35d488d4ee343098ac70d00bbdafb Parents: 1401166 Author: Cheng Lian Authored: Tue Dec 1 10:21:31 2015 -0800 Committer: Michael Armbrust Committed: Tue Dec 1 10:21:31 2015 -0800 -- .../spark/api/java/function/Function4.java | 2 +- .../spark/api/java/function/VoidFunction.java | 2 +- .../spark/api/java/function/VoidFunction2.java | 2 +- .../org/apache/spark/api/java/JavaPairRDD.scala | 16 +++ .../scala/org/apache/spark/memory/package.scala | 14 +++--- .../org/apache/spark/rdd/CoGroupedRDD.scala | 2 +- .../org/apache/spark/rdd/PairRDDFunctions.scala | 6 +-- .../org/apache/spark/rdd/ShuffledRDD.scala | 2 +- .../scala/org/apache/spark/scheduler/Task.scala | 5 ++- .../serializer/SerializationDebugger.scala | 13 +++--- .../scala/org/apache/spark/util/Vector.scala| 1 + .../spark/util/collection/ExternalSorter.scala | 30 ++--- .../WritablePartitionedPairCollection.scala | 7 +-- .../streaming/kinesis/KinesisReceiver.scala | 23 +- .../spark/streaming/kinesis/KinesisUtils.scala | 13 +++--- .../mllib/optimization/GradientDescent.scala| 12 +++--- project/SparkBuild.scala| 2 + .../scala/org/apache/spark/sql/Column.scala | 11 ++--- .../spark/streaming/StreamingContext.scala | 11 ++--- .../streaming/dstream/FileInputDStream.scala| 19 + .../streaming/receiver/BlockGenerator.scala | 22 +- .../scheduler/ReceiverSchedulingPolicy.scala| 45 ++-- .../streaming/util/FileBasedWriteAheadLog.scala | 7 +-- .../spark/streaming/util/RecurringTimer.scala | 8 ++-- .../org/apache/spark/deploy/yarn/Client.scala | 10 ++--- 25 files changed, 152 insertions(+), 133 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/69dbe6b4/core/src/main/java/org/apache/spark/api/java/function/Function4.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function4.java b/core/src/main/java/org/apache/spark/api/java/function/Function4.java index fd727d6..9c35a22 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function4.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function4.java @@ -23,5 +23,5 @@ import java.io.Serializable; * A four-argument function that takes arguments of type T1, T2, T3 and T4 and returns an R. */ public interface Function4 extends Serializable { - public R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception; + R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/69dbe6b4/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java index 2a10435..f30d42e 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java @@ -23,5 +23,5 @@ import java.io.Serializable; * A function with no return value. */ public interface VoidFunction extends Serializable { - public void call(T t) throws Exception; + void call(T t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/69dbe6b4/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java index 6c576ab..da9ae1c 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java @@ -23,5 +23,5 @@ import java.io.Serializable; * A two-argument function that takes arguments of type T1 and T2 with no return value. */ public interface VoidFunction2 extends Serializable { - public void call(T1 v1, T2 v2) throws Exception; + void call(T1 v1, T2 v2) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/69dbe6b4/core/src
spark git commit: [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize
Repository: spark Updated Branches: refs/heads/branch-1.6 add4e6311 -> 9b99b2b46 [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize `JavaSerializerInstance.serialize` uses `ByteArrayOutputStream.toByteArray` to get the serialized data. `ByteArrayOutputStream.toByteArray` needs to copy the content in the internal array to a new array. However, since the array will be converted to `ByteBuffer` at once, we can avoid the memory copy. This PR added `ByteBufferOutputStream` to access the protected `buf` and convert it to a `ByteBuffer` directly. Author: Shixiong Zhu Closes #10051 from zsxwing/SPARK-12060. (cherry picked from commit 1401166576c7018c5f9c31e0a6703d5fb16ea339) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b99b2b4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b99b2b4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b99b2b4 Branch: refs/heads/branch-1.6 Commit: 9b99b2b46c452ba396e922db5fc7eec02c45b158 Parents: add4e63 Author: Shixiong Zhu Authored: Tue Dec 1 09:45:55 2015 -0800 Committer: Shixiong Zhu Committed: Tue Dec 1 09:46:07 2015 -0800 -- .../spark/serializer/JavaSerializer.scala | 7 ++--- .../spark/util/ByteBufferOutputStream.scala | 31 2 files changed, 34 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b99b2b4/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala -- diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index b463a71..ea718a0 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -24,8 +24,7 @@ import scala.reflect.ClassTag import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.ByteBufferInputStream -import org.apache.spark.util.Utils +import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} private[spark] class JavaSerializationStream( out: OutputStream, counterReset: Int, extraDebugInfo: Boolean) @@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance( extends SerializerInstance { override def serialize[T: ClassTag](t: T): ByteBuffer = { -val bos = new ByteArrayOutputStream() +val bos = new ByteBufferOutputStream() val out = serializeStream(bos) out.writeObject(t) out.close() -ByteBuffer.wrap(bos.toByteArray) +bos.toByteBuffer } override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { http://git-wip-us.apache.org/repos/asf/spark/blob/9b99b2b4/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala new file mode 100644 index 000..92e4522 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer + +/** + * Provide a zero-copy way to convert data in ByteArrayOutputStream to ByteBuffer + */ +private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream { + + def toByteBuffer: ByteBuffer = { +return ByteBuffer.wrap(buf, 0, count) + } +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize
Repository: spark Updated Branches: refs/heads/master c87531b76 -> 140116657 [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize `JavaSerializerInstance.serialize` uses `ByteArrayOutputStream.toByteArray` to get the serialized data. `ByteArrayOutputStream.toByteArray` needs to copy the content in the internal array to a new array. However, since the array will be converted to `ByteBuffer` at once, we can avoid the memory copy. This PR added `ByteBufferOutputStream` to access the protected `buf` and convert it to a `ByteBuffer` directly. Author: Shixiong Zhu Closes #10051 from zsxwing/SPARK-12060. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14011665 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14011665 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14011665 Branch: refs/heads/master Commit: 1401166576c7018c5f9c31e0a6703d5fb16ea339 Parents: c87531b Author: Shixiong Zhu Authored: Tue Dec 1 09:45:55 2015 -0800 Committer: Shixiong Zhu Committed: Tue Dec 1 09:45:55 2015 -0800 -- .../spark/serializer/JavaSerializer.scala | 7 ++--- .../spark/util/ByteBufferOutputStream.scala | 31 2 files changed, 34 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14011665/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala -- diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index b463a71..ea718a0 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -24,8 +24,7 @@ import scala.reflect.ClassTag import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.ByteBufferInputStream -import org.apache.spark.util.Utils +import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} private[spark] class JavaSerializationStream( out: OutputStream, counterReset: Int, extraDebugInfo: Boolean) @@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance( extends SerializerInstance { override def serialize[T: ClassTag](t: T): ByteBuffer = { -val bos = new ByteArrayOutputStream() +val bos = new ByteBufferOutputStream() val out = serializeStream(bos) out.writeObject(t) out.close() -ByteBuffer.wrap(bos.toByteArray) +bos.toByteBuffer } override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { http://git-wip-us.apache.org/repos/asf/spark/blob/14011665/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala new file mode 100644 index 000..92e4522 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer + +/** + * Provide a zero-copy way to convert data in ByteArrayOutputStream to ByteBuffer + */ +private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream { + + def toByteBuffer: ByteBuffer = { +return ByteBuffer.wrap(buf, 0, count) + } +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11949][SQL] Set field nullable property for GroupingSets to get correct results for null values
Repository: spark Updated Branches: refs/heads/branch-1.6 1aa39bdb1 -> add4e6311 [SPARK-11949][SQL] Set field nullable property for GroupingSets to get correct results for null values JIRA: https://issues.apache.org/jira/browse/SPARK-11949 The result of cube plan uses incorrect schema. The schema of cube result should set nullable property to true because the grouping expressions will have null values. Author: Liang-Chi Hsieh Closes #10038 from viirya/fix-cube. (cherry picked from commit c87531b765f8934a9a6c0f673617e0abfa5e5f0e) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/add4e631 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/add4e631 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/add4e631 Branch: refs/heads/branch-1.6 Commit: add4e63116b5d7a9bfc0e08ad3b5750d40897602 Parents: 1aa39bd Author: Liang-Chi Hsieh Authored: Tue Dec 1 07:42:37 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 07:44:42 2015 -0800 -- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 -- .../org/apache/spark/sql/DataFrameAggregateSuite.scala| 10 ++ 2 files changed, 18 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/add4e631/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 94ffbbb..b8f212f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -223,6 +223,11 @@ class Analyzer( case other => Alias(other, other.toString)() } +// TODO: We need to use bitmasks to determine which grouping expressions need to be +// set as nullable. For example, if we have GROUPING SETS ((a,b), a), we do not need +// to change the nullability of a. +val attributeMap = groupByAliases.map(a => (a -> a.toAttribute.withNullability(true))).toMap + val aggregations: Seq[NamedExpression] = x.aggregations.map { // If an expression is an aggregate (contains a AggregateExpression) then we dont change // it so that the aggregation is computed on the unmodified value of its argument @@ -231,12 +236,13 @@ class Analyzer( // If not then its a grouping expression and we need to use the modified (with nulls from // Expand) value of the expression. case expr => expr.transformDown { -case e => groupByAliases.find(_.child.semanticEquals(e)).map(_.toAttribute).getOrElse(e) +case e => + groupByAliases.find(_.child.semanticEquals(e)).map(attributeMap(_)).getOrElse(e) }.asInstanceOf[NamedExpression] } val child = Project(x.child.output ++ groupByAliases, x.child) -val groupByAttributes = groupByAliases.map(_.toAttribute) +val groupByAttributes = groupByAliases.map(attributeMap(_)) Aggregate( groupByAttributes :+ VirtualColumn.groupingIdAttribute, http://git-wip-us.apache.org/repos/asf/spark/blob/add4e631/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index b5c636d..b1004bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.DecimalType +case class Fact(date: Int, hour: Int, minute: Int, room_name: String, temp: Double) class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -86,6 +87,15 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { Row(null, 2013, 78000.0) :: Row(null, null, 113000.0) :: Nil ) + +val df0 = sqlContext.sparkContext.parallelize(Seq( + Fact(20151123, 18, 35, "room1", 18.6), + Fact(20151123, 18, 35, "room2", 22.4), + Fact(20151123, 18, 36, "room1", 17.4), + Fact(20151123, 18, 36, "room2", 25.6))).toDF() + +val cube0 = df0.cube("date", "hour", "minute", "room_name").agg(Map("temp" -> "avg")) +assert(cube0.where("date IS NULL").coun
spark git commit: [SPARK-11949][SQL] Set field nullable property for GroupingSets to get correct results for null values
Repository: spark Updated Branches: refs/heads/master a0af0e351 -> c87531b76 [SPARK-11949][SQL] Set field nullable property for GroupingSets to get correct results for null values JIRA: https://issues.apache.org/jira/browse/SPARK-11949 The result of cube plan uses incorrect schema. The schema of cube result should set nullable property to true because the grouping expressions will have null values. Author: Liang-Chi Hsieh Closes #10038 from viirya/fix-cube. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c87531b7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c87531b7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c87531b7 Branch: refs/heads/master Commit: c87531b765f8934a9a6c0f673617e0abfa5e5f0e Parents: a0af0e3 Author: Liang-Chi Hsieh Authored: Tue Dec 1 07:42:37 2015 -0800 Committer: Yin Huai Committed: Tue Dec 1 07:44:22 2015 -0800 -- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 -- .../org/apache/spark/sql/DataFrameAggregateSuite.scala| 10 ++ 2 files changed, 18 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c87531b7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 94ffbbb..b8f212f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -223,6 +223,11 @@ class Analyzer( case other => Alias(other, other.toString)() } +// TODO: We need to use bitmasks to determine which grouping expressions need to be +// set as nullable. For example, if we have GROUPING SETS ((a,b), a), we do not need +// to change the nullability of a. +val attributeMap = groupByAliases.map(a => (a -> a.toAttribute.withNullability(true))).toMap + val aggregations: Seq[NamedExpression] = x.aggregations.map { // If an expression is an aggregate (contains a AggregateExpression) then we dont change // it so that the aggregation is computed on the unmodified value of its argument @@ -231,12 +236,13 @@ class Analyzer( // If not then its a grouping expression and we need to use the modified (with nulls from // Expand) value of the expression. case expr => expr.transformDown { -case e => groupByAliases.find(_.child.semanticEquals(e)).map(_.toAttribute).getOrElse(e) +case e => + groupByAliases.find(_.child.semanticEquals(e)).map(attributeMap(_)).getOrElse(e) }.asInstanceOf[NamedExpression] } val child = Project(x.child.output ++ groupByAliases, x.child) -val groupByAttributes = groupByAliases.map(_.toAttribute) +val groupByAttributes = groupByAliases.map(attributeMap(_)) Aggregate( groupByAttributes :+ VirtualColumn.groupingIdAttribute, http://git-wip-us.apache.org/repos/asf/spark/blob/c87531b7/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index b5c636d..b1004bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.DecimalType +case class Fact(date: Int, hour: Int, minute: Int, room_name: String, temp: Double) class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -86,6 +87,15 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { Row(null, 2013, 78000.0) :: Row(null, null, 113000.0) :: Nil ) + +val df0 = sqlContext.sparkContext.parallelize(Seq( + Fact(20151123, 18, 35, "room1", 18.6), + Fact(20151123, 18, 35, "room2", 22.4), + Fact(20151123, 18, 36, "room1", 17.4), + Fact(20151123, 18, 36, "room2", 25.6))).toDF() + +val cube0 = df0.cube("date", "hour", "minute", "room_name").agg(Map("temp" -> "avg")) +assert(cube0.where("date IS NULL").count > 0) } test("rollup overlapping columns") { ---
spark git commit: [SPARK-11898][MLLIB] Use broadcast for the global tables in Word2Vec
Repository: spark Updated Branches: refs/heads/master 9693b0d5a -> a0af0e351 [SPARK-11898][MLLIB] Use broadcast for the global tables in Word2Vec jira: https://issues.apache.org/jira/browse/SPARK-11898 syn0Global and sync1Global in word2vec are quite large objects with size (vocab * vectorSize * 8), yet they are passed to worker using basic task serialization. Use broadcast can greatly improve the performance. My benchmark shows that, for 1M vocabulary and default vectorSize 100, changing to broadcast can help, 1. decrease the worker memory consumption by 45%. 2. decrease running time by 40%. This will also help extend the upper limit for Word2Vec. Author: Yuhao Yang Closes #9878 from hhbyyh/w2vBC. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0af0e35 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0af0e35 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0af0e35 Branch: refs/heads/master Commit: a0af0e351e45a8be47a6f65efd132eaa4a00c9e4 Parents: 9693b0d Author: Yuhao Yang Authored: Tue Dec 1 09:26:58 2015 + Committer: Sean Owen Committed: Tue Dec 1 09:26:58 2015 + -- .../main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a0af0e35/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index a47f27b..655ac0b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -316,12 +316,15 @@ class Word2Vec extends Serializable with Logging { Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize) val syn1Global = new Array[Float](vocabSize * vectorSize) var alpha = learningRate + for (k <- 1 to numIterations) { + val bcSyn0Global = sc.broadcast(syn0Global) + val bcSyn1Global = sc.broadcast(syn1Global) val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) => val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8)) val syn0Modify = new Array[Int](vocabSize) val syn1Modify = new Array[Int](vocabSize) -val model = iter.foldLeft((syn0Global, syn1Global, 0, 0)) { +val model = iter.foldLeft((bcSyn0Global.value, bcSyn1Global.value, 0, 0)) { case ((syn0, syn1, lastWordCount, wordCount), sentence) => var lwc = lastWordCount var wc = wordCount @@ -405,6 +408,8 @@ class Word2Vec extends Serializable with Logging { } i += 1 } + bcSyn0Global.unpersist(false) + bcSyn1Global.unpersist(false) } newSentences.unpersist() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Set SPARK_EC2_VERSION to 1.5.2
Repository: spark Updated Branches: refs/heads/branch-1.5 d78f1bc45 -> 80dac0b07 Set SPARK_EC2_VERSION to 1.5.2 Author: Alexander Pivovarov Closes #10064 from apivovarov/patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80dac0b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80dac0b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80dac0b0 Branch: refs/heads/branch-1.5 Commit: 80dac0b07dd37064255f13874ebae5f75e371e9c Parents: d78f1bc Author: Alexander Pivovarov Authored: Tue Dec 1 01:17:49 2015 -0800 Committer: Shivaram Venkataraman Committed: Tue Dec 1 01:17:49 2015 -0800 -- ec2/spark_ec2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80dac0b0/ec2/spark_ec2.py -- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 561284e..ed538b0 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -51,7 +51,7 @@ else: raw_input = input xrange = range -SPARK_EC2_VERSION = "1.5.1" +SPARK_EC2_VERSION = "1.5.2" SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__)) VALID_SPARK_VERSIONS = set([ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org