spark git commit: [SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily
Repository: spark Updated Branches: refs/heads/branch-2.3 a857ad566 -> 012695256 [SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily ## What changes were proposed in this pull request? Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times. We can avoid this by making deserializeStream init when it is used the first time. This patch make deserializeStream init lazily. ## How was this patch tested? Exist tests Author: zhoukang Closes #20292 from caneGuy/zhoukang/lay-diskmapiterator. (cherry picked from commit 45b4bbfddc18a77011c3bc1bfd71b2cd3466443c) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01269525 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01269525 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01269525 Branch: refs/heads/branch-2.3 Commit: 012695256a61f1830ff02780611d4aada00a88a0 Parents: a857ad5 Author: zhoukang Authored: Thu Jan 25 15:24:52 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 25 15:25:46 2018 +0800 -- .../util/collection/ExternalAppendOnlyMap.scala| 17 ++--- 1 file changed, 10 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/01269525/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 375f4a6..5c6dd45 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -463,7 +463,7 @@ class ExternalAppendOnlyMap[K, V, C]( // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams -private var deserializeStream = nextBatchStream() +private var deserializeStream: DeserializationStream = null private var nextItem: (K, C) = null private var objectsRead = 0 @@ -528,7 +528,11 @@ class ExternalAppendOnlyMap[K, V, C]( override def hasNext: Boolean = { if (nextItem == null) { if (deserializeStream == null) { - return false + // In case of deserializeStream has not been initialized + deserializeStream = nextBatchStream() + if (deserializeStream == null) { +return false + } } nextItem = readNextItem() } @@ -536,19 +540,18 @@ class ExternalAppendOnlyMap[K, V, C]( } override def next(): (K, C) = { - val item = if (nextItem == null) readNextItem() else nextItem - if (item == null) { + if (!hasNext) { throw new NoSuchElementException } + val item = nextItem nextItem = null item } private def cleanup() { batchIndex = batchOffsets.length // Prevent reading any other batch - val ds = deserializeStream - if (ds != null) { -ds.close() + if (deserializeStream != null) { +deserializeStream.close() deserializeStream = null } if (fileStream != null) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily
Repository: spark Updated Branches: refs/heads/master 6f0ba8472 -> 45b4bbfdd [SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily ## What changes were proposed in this pull request? Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times. We can avoid this by making deserializeStream init when it is used the first time. This patch make deserializeStream init lazily. ## How was this patch tested? Exist tests Author: zhoukang Closes #20292 from caneGuy/zhoukang/lay-diskmapiterator. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45b4bbfd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45b4bbfd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45b4bbfd Branch: refs/heads/master Commit: 45b4bbfddc18a77011c3bc1bfd71b2cd3466443c Parents: 6f0ba84 Author: zhoukang Authored: Thu Jan 25 15:24:52 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 25 15:24:52 2018 +0800 -- .../util/collection/ExternalAppendOnlyMap.scala| 17 ++--- 1 file changed, 10 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/45b4bbfd/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 375f4a6..5c6dd45 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -463,7 +463,7 @@ class ExternalAppendOnlyMap[K, V, C]( // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams -private var deserializeStream = nextBatchStream() +private var deserializeStream: DeserializationStream = null private var nextItem: (K, C) = null private var objectsRead = 0 @@ -528,7 +528,11 @@ class ExternalAppendOnlyMap[K, V, C]( override def hasNext: Boolean = { if (nextItem == null) { if (deserializeStream == null) { - return false + // In case of deserializeStream has not been initialized + deserializeStream = nextBatchStream() + if (deserializeStream == null) { +return false + } } nextItem = readNextItem() } @@ -536,19 +540,18 @@ class ExternalAppendOnlyMap[K, V, C]( } override def next(): (K, C) = { - val item = if (nextItem == null) readNextItem() else nextItem - if (item == null) { + if (!hasNext) { throw new NoSuchElementException } + val item = nextItem nextItem = null item } private def cleanup() { batchIndex = batchOffsets.length // Prevent reading any other batch - val ds = deserializeStream - if (ds != null) { -ds.close() + if (deserializeStream != null) { +deserializeStream.close() deserializeStream = null } if (fileStream != null) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24425 - in /dev/spark/2.4.0-SNAPSHOT-2018_01_24_16_01-6f0ba84-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jan 25 00:15:03 2018 New Revision: 24425 Log: Apache Spark 2.4.0-SNAPSHOT-2018_01_24_16_01-6f0ba84 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24423 - in /dev/spark/2.3.1-SNAPSHOT-2018_01_24_14_01-a857ad5-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 24 22:15:33 2018 New Revision: 24423 Log: Apache Spark 2.3.1-SNAPSHOT-2018_01_24_14_01-a857ad5 docs [This commit notification would consist of 1442 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][SQL] add new unit test to LimitPushdown
Repository: spark Updated Branches: refs/heads/branch-2.3 500c94434 -> a857ad566 [MINOR][SQL] add new unit test to LimitPushdown ## What changes were proposed in this pull request? This PR is repaired as follows 1ãupdate y -> x in "left outer join" test case ,maybe is mistake. 2ãadd a new test caseï¼"left outer join and left sides are limited" 3ãadd a new test caseï¼"left outer join and right sides are limited" 4ãadd a new test case: "right outer join and right sides are limited" 5ãadd a new test case: "right outer join and left sides are limited" 6ãRemove annotations without code implementation ## How was this patch tested? add new unit test case. Author: caoxuewen Closes #20381 from heary-cao/LimitPushdownSuite. (cherry picked from commit 6f0ba8472d1128551fa8090deebcecde0daebc53) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a857ad56 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a857ad56 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a857ad56 Branch: refs/heads/branch-2.3 Commit: a857ad56621f644a26b9d27079b76ab21f3726ae Parents: 500c944 Author: caoxuewen Authored: Wed Jan 24 13:06:09 2018 -0800 Committer: gatorsmile Committed: Wed Jan 24 13:06:25 2018 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 1 - .../catalyst/optimizer/LimitPushdownSuite.scala | 30 +++- 2 files changed, 29 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a857ad56/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0f9daa5..8d20770 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -352,7 +352,6 @@ object LimitPushDown extends Rule[LogicalPlan] { // on both sides if it is applied multiple times. Therefore: // - If one side is already limited, stack another limit on top if the new limit is smaller. // The redundant limit will be collapsed by the CombineLimits rule. -// - If neither side is limited, limit the side that is estimated to be bigger. case LocalLimit(exp, join @ Join(left, right, joinType, _)) => val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) http://git-wip-us.apache.org/repos/asf/spark/blob/a857ad56/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index cc98d23..17fb9fc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -93,7 +93,21 @@ class LimitPushdownSuite extends PlanTest { test("left outer join") { val originalQuery = x.join(y, LeftOuter).limit(1) val optimized = Optimize.execute(originalQuery.analyze) -val correctAnswer = Limit(1, LocalLimit(1, y).join(y, LeftOuter)).analyze +val correctAnswer = Limit(1, LocalLimit(1, x).join(y, LeftOuter)).analyze +comparePlans(optimized, correctAnswer) + } + + test("left outer join and left sides are limited") { +val originalQuery = x.limit(2).join(y, LeftOuter).limit(1) +val optimized = Optimize.execute(originalQuery.analyze) +val correctAnswer = Limit(1, LocalLimit(1, x).join(y, LeftOuter)).analyze +comparePlans(optimized, correctAnswer) + } + + test("left outer join and right sides are limited") { +val originalQuery = x.join(y.limit(2), LeftOuter).limit(1) +val optimized = Optimize.execute(originalQuery.analyze) +val correctAnswer = Limit(1, LocalLimit(1, x).join(Limit(2, y), LeftOuter)).analyze comparePlans(optimized, correctAnswer) } @@ -104,6 +118,20 @@ class LimitPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("right outer join and right sides are limited") { +val originalQuery = x.join(y.limit(2), RightOuter).limit(1) +val optimized = Optimize.execute(originalQuery.analyze) +val correctAnswer = Limit(1, x.join(LocalLimit(1, y), RightOuter)).analyze +comparePlans(optimized, correctAnswer) + } + + tes
spark git commit: [MINOR][SQL] add new unit test to LimitPushdown
Repository: spark Updated Branches: refs/heads/master bc9641d90 -> 6f0ba8472 [MINOR][SQL] add new unit test to LimitPushdown ## What changes were proposed in this pull request? This PR is repaired as follows 1ãupdate y -> x in "left outer join" test case ,maybe is mistake. 2ãadd a new test caseï¼"left outer join and left sides are limited" 3ãadd a new test caseï¼"left outer join and right sides are limited" 4ãadd a new test case: "right outer join and right sides are limited" 5ãadd a new test case: "right outer join and left sides are limited" 6ãRemove annotations without code implementation ## How was this patch tested? add new unit test case. Author: caoxuewen Closes #20381 from heary-cao/LimitPushdownSuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f0ba847 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f0ba847 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f0ba847 Branch: refs/heads/master Commit: 6f0ba8472d1128551fa8090deebcecde0daebc53 Parents: bc9641d Author: caoxuewen Authored: Wed Jan 24 13:06:09 2018 -0800 Committer: gatorsmile Committed: Wed Jan 24 13:06:09 2018 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 1 - .../catalyst/optimizer/LimitPushdownSuite.scala | 30 +++- 2 files changed, 29 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6f0ba847/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0f9daa5..8d20770 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -352,7 +352,6 @@ object LimitPushDown extends Rule[LogicalPlan] { // on both sides if it is applied multiple times. Therefore: // - If one side is already limited, stack another limit on top if the new limit is smaller. // The redundant limit will be collapsed by the CombineLimits rule. -// - If neither side is limited, limit the side that is estimated to be bigger. case LocalLimit(exp, join @ Join(left, right, joinType, _)) => val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) http://git-wip-us.apache.org/repos/asf/spark/blob/6f0ba847/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index cc98d23..17fb9fc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -93,7 +93,21 @@ class LimitPushdownSuite extends PlanTest { test("left outer join") { val originalQuery = x.join(y, LeftOuter).limit(1) val optimized = Optimize.execute(originalQuery.analyze) -val correctAnswer = Limit(1, LocalLimit(1, y).join(y, LeftOuter)).analyze +val correctAnswer = Limit(1, LocalLimit(1, x).join(y, LeftOuter)).analyze +comparePlans(optimized, correctAnswer) + } + + test("left outer join and left sides are limited") { +val originalQuery = x.limit(2).join(y, LeftOuter).limit(1) +val optimized = Optimize.execute(originalQuery.analyze) +val correctAnswer = Limit(1, LocalLimit(1, x).join(y, LeftOuter)).analyze +comparePlans(optimized, correctAnswer) + } + + test("left outer join and right sides are limited") { +val originalQuery = x.join(y.limit(2), LeftOuter).limit(1) +val optimized = Optimize.execute(originalQuery.analyze) +val correctAnswer = Limit(1, LocalLimit(1, x).join(Limit(2, y), LeftOuter)).analyze comparePlans(optimized, correctAnswer) } @@ -104,6 +118,20 @@ class LimitPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("right outer join and right sides are limited") { +val originalQuery = x.join(y.limit(2), RightOuter).limit(1) +val optimized = Optimize.execute(originalQuery.analyze) +val correctAnswer = Limit(1, x.join(LocalLimit(1, y), RightOuter)).analyze +comparePlans(optimized, correctAnswer) + } + + test("right outer join and left sides are limited") { +val originalQuery = x.limit(2).join(y, RightOuter
spark git commit: [SPARK-23198][SS][TEST] Fix KafkaContinuousSourceStressForDontFailOnDataLossSuite to test ContinuousExecution
Repository: spark Updated Branches: refs/heads/branch-2.3 30272c668 -> 500c94434 [SPARK-23198][SS][TEST] Fix KafkaContinuousSourceStressForDontFailOnDataLossSuite to test ContinuousExecution ## What changes were proposed in this pull request? Currently, `KafkaContinuousSourceStressForDontFailOnDataLossSuite` runs on `MicroBatchExecution`. It should test `ContinuousExecution`. ## How was this patch tested? Pass the updated test suite. Author: Dongjoon Hyun Closes #20374 from dongjoon-hyun/SPARK-23198. (cherry picked from commit bc9641d9026aeae3571915b003ac971f6245d53c) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/500c9443 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/500c9443 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/500c9443 Branch: refs/heads/branch-2.3 Commit: 500c94434d8f5267b1488accd176cf54b69e6ba4 Parents: 30272c6 Author: Dongjoon Hyun Authored: Wed Jan 24 12:58:44 2018 -0800 Committer: Shixiong Zhu Committed: Wed Jan 24 12:58:51 2018 -0800 -- .../org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/500c9443/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index b3dade4..a7083fa 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -91,6 +91,7 @@ class KafkaContinuousSourceStressForDontFailOnDataLossSuite ds.writeStream .format("memory") .queryName("memory") + .trigger(Trigger.Continuous("1 second")) .start() } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23198][SS][TEST] Fix KafkaContinuousSourceStressForDontFailOnDataLossSuite to test ContinuousExecution
Repository: spark Updated Branches: refs/heads/master 0e178e152 -> bc9641d90 [SPARK-23198][SS][TEST] Fix KafkaContinuousSourceStressForDontFailOnDataLossSuite to test ContinuousExecution ## What changes were proposed in this pull request? Currently, `KafkaContinuousSourceStressForDontFailOnDataLossSuite` runs on `MicroBatchExecution`. It should test `ContinuousExecution`. ## How was this patch tested? Pass the updated test suite. Author: Dongjoon Hyun Closes #20374 from dongjoon-hyun/SPARK-23198. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc9641d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc9641d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc9641d9 Branch: refs/heads/master Commit: bc9641d9026aeae3571915b003ac971f6245d53c Parents: 0e178e1 Author: Dongjoon Hyun Authored: Wed Jan 24 12:58:44 2018 -0800 Committer: Shixiong Zhu Committed: Wed Jan 24 12:58:44 2018 -0800 -- .../org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc9641d9/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index b3dade4..a7083fa 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -91,6 +91,7 @@ class KafkaContinuousSourceStressForDontFailOnDataLossSuite ds.writeStream .format("memory") .queryName("memory") + .trigger(Trigger.Continuous("1 second")) .start() } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24419 - in /dev/spark/2.4.0-SNAPSHOT-2018_01_24_12_01-0e178e1-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 24 20:15:14 2018 New Revision: 24419 Log: Apache Spark 2.4.0-SNAPSHOT-2018_01_24_12_01-0e178e1 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite "Shuffle registration timeout and maxAttempts conf"
Repository: spark Updated Branches: refs/heads/master 840dea64a -> 0e178e152 [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite "Shuffle registration timeout and maxAttempts conf" ## What changes were proposed in this pull request? [Ticket](https://issues.apache.org/jira/browse/SPARK-22297) - one of the tests seems to produce unreliable results due to execution speed variability Since the original test was trying to connect to the test server with `40 ms` timeout, and the test server replied after `50 ms`, the error might be produced under the following conditions: - it might occur that the test server replies correctly after `50 ms` - but the client does only receive the timeout after `51 ms`s - this might happen if the executor has to schedule a big number of threads, and decides to delay the thread/actor that is responsible to watch the timeout, because of high CPU load - running an entire test suite usually produces high loads on the CPU executing the tests ## How was this patch tested? The test's check cases remain the same and the set-up emulates the previous version's. Author: Mark Petruska Closes #19671 from mpetruska/SPARK-22297. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e178e15 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e178e15 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e178e15 Branch: refs/heads/master Commit: 0e178e1523175a0be9437920045e80deb0a2712b Parents: 840dea6 Author: Mark Petruska Authored: Wed Jan 24 10:25:14 2018 -0800 Committer: Marcelo Vanzin Committed: Wed Jan 24 10:25:14 2018 -0800 -- .../spark/storage/BlockManagerSuite.scala | 55 ++-- 1 file changed, 38 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0e178e15/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 629eed4..b19d8eb 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.storage import java.nio.ByteBuffer import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.Future import scala.concurrent.duration._ @@ -44,8 +43,9 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} -import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, TempFileManager} import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} +import org.apache.spark.network.util.TransportConf import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite} @@ -1325,9 +1325,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") { val tryAgainMsg = "test_spark_20640_try_again" +val timingoutExecutor = "timingoutExecutor" +val tryAgainExecutor = "tryAgainExecutor" +val succeedingExecutor = "succeedingExecutor" + // a server which delays response 50ms and must try twice for success. def newShuffleServer(port: Int): (TransportServer, Int) = { - val attempts = new mutable.HashMap[String, Int]() + val failure = new Exception(tryAgainMsg) + val success = ByteBuffer.wrap(new Array[Byte](0)) + + var secondExecutorFailedOnce = false + var thirdExecutorFailedOnce = false + val handler = new NoOpRpcHandler { override def receive( client: TransportClient, @@ -1335,15 +1344,26 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE callback: RpcResponseCallback): Unit = { val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message) msgObj match { -case exec: RegisterExecutor => - Thread.sleep(50) - val attempt = attempts.getOrElse(exec.execId, 0) + 1 - attempts(exec.execId) = attempt - if (attempt < 2) { -call
svn commit: r24416 - in /dev/spark/2.3.1-SNAPSHOT-2018_01_24_10_01-2221a30-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 24 18:15:36 2018 New Revision: 24416 Log: Apache Spark 2.3.1-SNAPSHOT-2018_01_24_10_01-2221a30 docs [This commit notification would consist of 1442 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23152][ML] - Correctly guard against empty datasets
Repository: spark Updated Branches: refs/heads/master bbb87b350 -> 840dea64a [SPARK-23152][ML] - Correctly guard against empty datasets ## What changes were proposed in this pull request? Correctly guard against empty datasets in `org.apache.spark.ml.classification.Classifier` ## How was this patch tested? existing tests Author: Matthew Tovbin Closes #20321 from tovbinm/SPARK-23152. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/840dea64 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/840dea64 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/840dea64 Branch: refs/heads/master Commit: 840dea64abd8a3a5960de830f19a57f5f1aa3bf6 Parents: bbb87b3 Author: Matthew Tovbin Authored: Wed Jan 24 13:13:44 2018 -0500 Committer: Sean Owen Committed: Wed Jan 24 13:13:44 2018 -0500 -- .../scala/org/apache/spark/ml/classification/Classifier.scala | 2 +- .../org/apache/spark/ml/classification/ClassifierSuite.scala | 7 +++ 2 files changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/840dea64/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index bc0b49d..9d1d5aa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -109,7 +109,7 @@ abstract class Classifier[ case None => // Get number of classes from dataset itself. val maxLabelRow: Array[Row] = dataset.select(max($(labelCol))).take(1) -if (maxLabelRow.isEmpty) { +if (maxLabelRow.isEmpty || maxLabelRow(0).get(0) == null) { throw new SparkException("ML algorithm was given empty dataset.") } val maxDoubleLabel: Double = maxLabelRow.head.getDouble(0) http://git-wip-us.apache.org/repos/asf/spark/blob/840dea64/mllib/src/test/scala/org/apache/spark/ml/classification/ClassifierSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/ClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/ClassifierSuite.scala index de71207..87bf2be 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/ClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/ClassifierSuite.scala @@ -90,6 +90,13 @@ class ClassifierSuite extends SparkFunSuite with MLlibTestSparkContext { } assert(e.getMessage.contains("requires integers in range")) } +val df3 = getTestData(Seq.empty[Double]) +withClue("getNumClasses should fail if dataset is empty") { + val e: SparkException = intercept[SparkException] { +c.getNumClasses(df3) + } + assert(e.getMessage == "ML algorithm was given empty dataset.") +} } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22837][SQL] Session timeout checker does not work in SessionManager.
Repository: spark Updated Branches: refs/heads/branch-2.3 2221a3035 -> 30272c668 [SPARK-22837][SQL] Session timeout checker does not work in SessionManager. ## What changes were proposed in this pull request? Currently we do not call the `super.init(hiveConf)` in `SparkSQLSessionManager.init`. So we do not load the config `HIVE_SERVER2_SESSION_CHECK_INTERVAL HIVE_SERVER2_IDLE_SESSION_TIMEOUT HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION` , which cause the session timeout checker does not work. ## How was this patch tested? manual tests Author: zuotingbing Closes #20025 from zuotingbing/SPARK-22837. (cherry picked from commit bbb87b350d9d0d393db3fb7ca61dcbae538553bb) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30272c66 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30272c66 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30272c66 Branch: refs/heads/branch-2.3 Commit: 30272c668b2cd8c0b0ee78c600bc3feb17bd6647 Parents: 2221a30 Author: zuotingbing Authored: Wed Jan 24 10:07:24 2018 -0800 Committer: gatorsmile Committed: Wed Jan 24 10:07:34 2018 -0800 -- .../hive/thriftserver/SparkSQLSessionManager.scala | 16 +--- 1 file changed, 1 insertion(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/30272c66/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 48c0ebe..2958b77 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -40,22 +40,8 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: private lazy val sparkSqlOperationManager = new SparkSQLOperationManager() override def init(hiveConf: HiveConf) { -setSuperField(this, "hiveConf", hiveConf) - -// Create operation log root directory, if operation logging is enabled -if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { - invoke(classOf[SessionManager], this, "initOperationLogRootDir") -} - -val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) -setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize)) -getAncestorField[Log](this, 3, "LOG").info( - s"HiveServer2: Async execution pool size $backgroundPoolSize") - setSuperField(this, "operationManager", sparkSqlOperationManager) -addService(sparkSqlOperationManager) - -initCompositeService(hiveConf) +super.init(hiveConf) } override def openSession( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22837][SQL] Session timeout checker does not work in SessionManager.
Repository: spark Updated Branches: refs/heads/master 8c273b416 -> bbb87b350 [SPARK-22837][SQL] Session timeout checker does not work in SessionManager. ## What changes were proposed in this pull request? Currently we do not call the `super.init(hiveConf)` in `SparkSQLSessionManager.init`. So we do not load the config `HIVE_SERVER2_SESSION_CHECK_INTERVAL HIVE_SERVER2_IDLE_SESSION_TIMEOUT HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION` , which cause the session timeout checker does not work. ## How was this patch tested? manual tests Author: zuotingbing Closes #20025 from zuotingbing/SPARK-22837. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbb87b35 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbb87b35 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbb87b35 Branch: refs/heads/master Commit: bbb87b350d9d0d393db3fb7ca61dcbae538553bb Parents: 8c273b4 Author: zuotingbing Authored: Wed Jan 24 10:07:24 2018 -0800 Committer: gatorsmile Committed: Wed Jan 24 10:07:24 2018 -0800 -- .../hive/thriftserver/SparkSQLSessionManager.scala | 16 +--- 1 file changed, 1 insertion(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bbb87b35/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 48c0ebe..2958b77 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -40,22 +40,8 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: private lazy val sparkSqlOperationManager = new SparkSQLOperationManager() override def init(hiveConf: HiveConf) { -setSuperField(this, "hiveConf", hiveConf) - -// Create operation log root directory, if operation logging is enabled -if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { - invoke(classOf[SessionManager], this, "initOperationLogRootDir") -} - -val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) -setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize)) -getAncestorField[Log](this, 3, "LOG").info( - s"HiveServer2: Async execution pool size $backgroundPoolSize") - setSuperField(this, "operationManager", sparkSqlOperationManager) -addService(sparkSqlOperationManager) - -initCompositeService(hiveConf) +super.init(hiveConf) } override def openSession( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23020][CORE][FOLLOWUP] Fix Java style check issues.
Repository: spark Updated Branches: refs/heads/branch-2.3 4336e67f4 -> 2221a3035 [SPARK-23020][CORE][FOLLOWUP] Fix Java style check issues. ## What changes were proposed in this pull request? This is a follow-up of #20297 which broke lint-java checks. This pr fixes the lint-java issues. ``` [ERROR] src/test/java/org/apache/spark/launcher/BaseSuite.java:[21,8] (imports) UnusedImports: Unused import - java.util.concurrent.TimeUnit. [ERROR] src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java:[27,8] (imports) UnusedImports: Unused import - java.util.concurrent.TimeUnit. ``` ## How was this patch tested? Checked manually in my local environment. Author: Takuya UESHIN Closes #20376 from ueshin/issues/SPARK-23020/fup1. (cherry picked from commit 8c273b4162b6138c4abba64f595c2750d1ef8bcb) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2221a303 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2221a303 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2221a303 Branch: refs/heads/branch-2.3 Commit: 2221a30352f1c0f5483c91301f32e66672a43644 Parents: 4336e67 Author: Takuya UESHIN Authored: Wed Jan 24 10:00:42 2018 -0800 Committer: Marcelo Vanzin Committed: Wed Jan 24 10:00:55 2018 -0800 -- .../src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java | 1 - launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java | 1 - 2 files changed, 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2221a303/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java -- diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index a042375..1543f4f 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.TimeUnit; import org.junit.Test; import static org.junit.Assert.*; http://git-wip-us.apache.org/repos/asf/spark/blob/2221a303/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java -- diff --git a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java index 3722a59..438349e 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java @@ -18,7 +18,6 @@ package org.apache.spark.launcher; import java.time.Duration; -import java.util.concurrent.TimeUnit; import org.junit.After; import org.slf4j.bridge.SLF4JBridgeHandler; - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23020][CORE][FOLLOWUP] Fix Java style check issues.
Repository: spark Updated Branches: refs/heads/master e18d6f532 -> 8c273b416 [SPARK-23020][CORE][FOLLOWUP] Fix Java style check issues. ## What changes were proposed in this pull request? This is a follow-up of #20297 which broke lint-java checks. This pr fixes the lint-java issues. ``` [ERROR] src/test/java/org/apache/spark/launcher/BaseSuite.java:[21,8] (imports) UnusedImports: Unused import - java.util.concurrent.TimeUnit. [ERROR] src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java:[27,8] (imports) UnusedImports: Unused import - java.util.concurrent.TimeUnit. ``` ## How was this patch tested? Checked manually in my local environment. Author: Takuya UESHIN Closes #20376 from ueshin/issues/SPARK-23020/fup1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c273b41 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c273b41 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c273b41 Branch: refs/heads/master Commit: 8c273b4162b6138c4abba64f595c2750d1ef8bcb Parents: e18d6f5 Author: Takuya UESHIN Authored: Wed Jan 24 10:00:42 2018 -0800 Committer: Marcelo Vanzin Committed: Wed Jan 24 10:00:42 2018 -0800 -- .../src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java | 1 - launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java | 1 - 2 files changed, 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8c273b41/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java -- diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index a042375..1543f4f 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.TimeUnit; import org.junit.Test; import static org.junit.Assert.*; http://git-wip-us.apache.org/repos/asf/spark/blob/8c273b41/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java -- diff --git a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java index 3722a59..438349e 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java @@ -18,7 +18,6 @@ package org.apache.spark.launcher; import java.time.Duration; -import java.util.concurrent.TimeUnit; import org.junit.After; import org.slf4j.bridge.SLF4JBridgeHandler; - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20906][SPARKR] Add API doc example for Constrained Logistic Regression
Repository: spark Updated Branches: refs/heads/branch-2.3 17317c8fb -> 4336e67f4 [SPARK-20906][SPARKR] Add API doc example for Constrained Logistic Regression ## What changes were proposed in this pull request? doc only changes ## How was this patch tested? manual Author: Felix Cheung Closes #20380 from felixcheung/rclrdoc. (cherry picked from commit e18d6f5326e0d9ea03d31de5ce04cb84d3b8ab37) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4336e67f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4336e67f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4336e67f Branch: refs/heads/branch-2.3 Commit: 4336e67f41344fd587808182741ae4ef9fb2b76a Parents: 17317c8 Author: Felix Cheung Authored: Wed Jan 24 09:37:54 2018 -0800 Committer: Felix Cheung Committed: Wed Jan 24 09:38:16 2018 -0800 -- R/pkg/R/mllib_classification.R| 15 ++- R/pkg/tests/fulltests/test_mllib_classification.R | 10 +- 2 files changed, 19 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4336e67f/R/pkg/R/mllib_classification.R -- diff --git a/R/pkg/R/mllib_classification.R b/R/pkg/R/mllib_classification.R index 7cd072a..f6e9b13 100644 --- a/R/pkg/R/mllib_classification.R +++ b/R/pkg/R/mllib_classification.R @@ -279,11 +279,24 @@ function(object, path, overwrite = FALSE) { #' savedModel <- read.ml(path) #' summary(savedModel) #' -#' # multinomial logistic regression +#' # binary logistic regression against two classes with +#' # upperBoundsOnCoefficients and upperBoundsOnIntercepts +#' ubc <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4) +#' model <- spark.logit(training, Species ~ ., +#' upperBoundsOnCoefficients = ubc, +#' upperBoundsOnIntercepts = 1.0) #' +#' # multinomial logistic regression #' model <- spark.logit(training, Class ~ ., regParam = 0.5) #' summary <- summary(model) #' +#' # multinomial logistic regression with +#' # lowerBoundsOnCoefficients and lowerBoundsOnIntercepts +#' lbc <- matrix(c(0.0, -1.0, 0.0, -1.0, 0.0, -1.0, 0.0, -1.0), nrow = 2, ncol = 4) +#' lbi <- as.array(c(0.0, 0.0)) +#' model <- spark.logit(training, Species ~ ., family = "multinomial", +#' lowerBoundsOnCoefficients = lbc, +#' lowerBoundsOnIntercepts = lbi) #' } #' @note spark.logit since 2.1.0 setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"), http://git-wip-us.apache.org/repos/asf/spark/blob/4336e67f/R/pkg/tests/fulltests/test_mllib_classification.R -- diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R b/R/pkg/tests/fulltests/test_mllib_classification.R index ad47717..a46c47d 100644 --- a/R/pkg/tests/fulltests/test_mllib_classification.R +++ b/R/pkg/tests/fulltests/test_mllib_classification.R @@ -124,7 +124,7 @@ test_that("spark.logit", { # Petal.Width 0.42122607 # nolint end - # Test multinomial logistic regression againt three classes + # Test multinomial logistic regression against three classes df <- suppressWarnings(createDataFrame(iris)) model <- spark.logit(df, Species ~ ., regParam = 0.5) summary <- summary(model) @@ -196,7 +196,7 @@ test_that("spark.logit", { # # nolint end - # Test multinomial logistic regression againt two classes + # Test multinomial logistic regression against two classes df <- suppressWarnings(createDataFrame(iris)) training <- df[df$Species %in% c("versicolor", "virginica"), ] model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial") @@ -208,7 +208,7 @@ test_that("spark.logit", { expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1)) expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1)) - # Test binomial logistic regression againt two classes + # Test binomial logistic regression against two classes model <- spark.logit(training, Species ~ ., regParam = 0.5) summary <- summary(model) coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04) @@ -239,7 +239,7 @@ test_that("spark.logit", { prediction2 <- collect(select(predict(model2, df2), "prediction")) expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0")) - # Test binomial logistic regression againt two classes with upperBoundsOnCoefficients + # Test binomial logistic regression against two classes with upperBoundsOnCoefficients # and upperBoundsOnIntercepts u <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4) model <- spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u, @@ -252,7 +252,7 @@ test_that("spark.log
spark git commit: [SPARK-20906][SPARKR] Add API doc example for Constrained Logistic Regression
Repository: spark Updated Branches: refs/heads/master 0ec95bb7d -> e18d6f532 [SPARK-20906][SPARKR] Add API doc example for Constrained Logistic Regression ## What changes were proposed in this pull request? doc only changes ## How was this patch tested? manual Author: Felix Cheung Closes #20380 from felixcheung/rclrdoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e18d6f53 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e18d6f53 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e18d6f53 Branch: refs/heads/master Commit: e18d6f5326e0d9ea03d31de5ce04cb84d3b8ab37 Parents: 0ec95bb Author: Felix Cheung Authored: Wed Jan 24 09:37:54 2018 -0800 Committer: Felix Cheung Committed: Wed Jan 24 09:37:54 2018 -0800 -- R/pkg/R/mllib_classification.R| 15 ++- R/pkg/tests/fulltests/test_mllib_classification.R | 10 +- 2 files changed, 19 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e18d6f53/R/pkg/R/mllib_classification.R -- diff --git a/R/pkg/R/mllib_classification.R b/R/pkg/R/mllib_classification.R index 7cd072a..f6e9b13 100644 --- a/R/pkg/R/mllib_classification.R +++ b/R/pkg/R/mllib_classification.R @@ -279,11 +279,24 @@ function(object, path, overwrite = FALSE) { #' savedModel <- read.ml(path) #' summary(savedModel) #' -#' # multinomial logistic regression +#' # binary logistic regression against two classes with +#' # upperBoundsOnCoefficients and upperBoundsOnIntercepts +#' ubc <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4) +#' model <- spark.logit(training, Species ~ ., +#' upperBoundsOnCoefficients = ubc, +#' upperBoundsOnIntercepts = 1.0) #' +#' # multinomial logistic regression #' model <- spark.logit(training, Class ~ ., regParam = 0.5) #' summary <- summary(model) #' +#' # multinomial logistic regression with +#' # lowerBoundsOnCoefficients and lowerBoundsOnIntercepts +#' lbc <- matrix(c(0.0, -1.0, 0.0, -1.0, 0.0, -1.0, 0.0, -1.0), nrow = 2, ncol = 4) +#' lbi <- as.array(c(0.0, 0.0)) +#' model <- spark.logit(training, Species ~ ., family = "multinomial", +#' lowerBoundsOnCoefficients = lbc, +#' lowerBoundsOnIntercepts = lbi) #' } #' @note spark.logit since 2.1.0 setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"), http://git-wip-us.apache.org/repos/asf/spark/blob/e18d6f53/R/pkg/tests/fulltests/test_mllib_classification.R -- diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R b/R/pkg/tests/fulltests/test_mllib_classification.R index ad47717..a46c47d 100644 --- a/R/pkg/tests/fulltests/test_mllib_classification.R +++ b/R/pkg/tests/fulltests/test_mllib_classification.R @@ -124,7 +124,7 @@ test_that("spark.logit", { # Petal.Width 0.42122607 # nolint end - # Test multinomial logistic regression againt three classes + # Test multinomial logistic regression against three classes df <- suppressWarnings(createDataFrame(iris)) model <- spark.logit(df, Species ~ ., regParam = 0.5) summary <- summary(model) @@ -196,7 +196,7 @@ test_that("spark.logit", { # # nolint end - # Test multinomial logistic regression againt two classes + # Test multinomial logistic regression against two classes df <- suppressWarnings(createDataFrame(iris)) training <- df[df$Species %in% c("versicolor", "virginica"), ] model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial") @@ -208,7 +208,7 @@ test_that("spark.logit", { expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1)) expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1)) - # Test binomial logistic regression againt two classes + # Test binomial logistic regression against two classes model <- spark.logit(training, Species ~ ., regParam = 0.5) summary <- summary(model) coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04) @@ -239,7 +239,7 @@ test_that("spark.logit", { prediction2 <- collect(select(predict(model2, df2), "prediction")) expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0")) - # Test binomial logistic regression againt two classes with upperBoundsOnCoefficients + # Test binomial logistic regression against two classes with upperBoundsOnCoefficients # and upperBoundsOnIntercepts u <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4) model <- spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u, @@ -252,7 +252,7 @@ test_that("spark.logit", { expect_error(spark.logit(training, Species ~ ., upperBoundsOnCoefficients = as.array(c(1, 2)),
[2/5] spark git commit: [SPARK-22577][CORE] executor page blacklist status should update with TaskSet level blacklisting
http://git-wip-us.apache.org/repos/asf/spark/blob/0ec95bb7/core/src/test/resources/spark-events/application_1516285256255_0012 -- diff --git a/core/src/test/resources/spark-events/application_1516285256255_0012 b/core/src/test/resources/spark-events/application_1516285256255_0012 new file mode 100755 index 000..3e1736c --- /dev/null +++ b/core/src/test/resources/spark-events/application_1516285256255_0012 @@ -0,0 +1,71 @@ +{"Event":"SparkListenerLogStart","Spark Version":"2.3.0-SNAPSHOT"} +{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre","Java Version":"1.8.0_161 (Oracle Corporation)","Scala Version":"version 2.11.8"},"Spark Properties":{"spark.blacklist.enabled":"true","spark.driver.host":"apiros-1.gce.test.com","spark.eventLog.enabled":"true","spark.driver.port":"33058","spark.repl.class.uri":"spark://apiros-1.gce.test.com:33058/classes","spark.jars":"","spark.repl.class.outputDir":"/tmp/spark-6781fb17-e07a-4b32-848b-9936c2e88b33/repl-c0fd7008-04be-471e-a173-6ad3e62d53d7","spark.app.name":"Spark shell","spark.blacklist.stage.maxFailedExecutorsPerNode":"1","spark.scheduler.mode":"FIFO","spark.executor.instances":"8","spark.ui.showConsoleProgress":"true","spark.blacklist.stage.maxFailedTasksPerExecutor":"1","spark.executor.id":"driver","spark.submit.deployMode":"client","spark.master":"yarn","spark.ui.filters":"org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter","spark.e xecutor.memory":"2G","spark.home":"/github/spark","spark.sql.catalogImplementation":"hive","spark.driver.appUIAddress":"http://apiros-1.gce.test.com:4040","spark.blacklist.application.maxFailedTasksPerExecutor":"10","spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS":"apiros-1.gce.test.com","spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES":"http://apiros-1.gce.test.com:8088/proxy/application_1516285256255_0012","spark.app.id":"application_1516285256255_0012"},"System Properties":{"java.io.tmpdir":"/tmp","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"*(redacted)","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","su n.arch.data.model":"64","sun.boot.library.path":"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre/lib/amd64","user.dir":"*(redacted)","java.library.path":"/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib","sun.cpu.isalist":"","os.arch":"amd64","java.vm.version":"25.161-b14","java.endorsed.dirs":"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre/lib/endorsed","java.runtime.version":"1.8.0_161-b14","java.vm.info":"mixed mode","java.ext.dirs":"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre/lib/ext:/usr/java/packages/lib/ext","java.runtime.name":"OpenJDK Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre/lib/rt.jar:/usr/lib/jvm/java-1.8.0-openjdk-1. 8.0.161-0.b14.el7_4.x86_64/jre/lib/sunrsasign.jar:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre/lib/jfr.jar:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre/classes","file.encoding":"UTF-8","user.timezone":"*(redacted)","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"3.10.0-693.5.2.el7.x86_64","sun.os.patch.level":"unknown","java.vm.specification.vendor":"Oracle Corporation","user.country":"*(redacted)","sun.jnu.encoding":"UTF-8","user.language":"*(redacted)","java.vendor.url":"*(redacted)","java.awt.printerjob":"sun.print.PSPrinterJob","java.awt.graphicsenv":"sun.awt.X11GraphicsEnvironment","awt.toolkit":"sun.awt.X11.XToolkit","os.nam e":"Linux","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"*(redacted)","user.name":"*(redacted)","java.vm.name":"OpenJDK 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode client --conf spark.blacklist.stage.maxFailedTasksPerExecutor=1 --conf spark.blacklist.enabled=true --conf spark.blacklist.application.maxFailedTasksP
[4/5] spark git commit: [SPARK-22577][CORE] executor page blacklist status should update with TaskSet level blacklisting
http://git-wip-us.apache.org/repos/asf/spark/blob/0ec95bb7/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json -- diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json index 31093a6..03f886a 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json @@ -421,7 +421,8 @@ "shuffleWrite" : 13180, "shuffleWriteRecords" : 0, "memoryBytesSpilled" : 0, - "diskBytesSpilled" : 0 + "diskBytesSpilled" : 0, + "isBlacklistedForStage" : false } }, "killedTasksSummary" : { } http://git-wip-us.apache.org/repos/asf/spark/blob/0ec95bb7/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json -- diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json index 601d706..947c899 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json @@ -421,7 +421,8 @@ "shuffleWrite" : 13180, "shuffleWriteRecords" : 0, "memoryBytesSpilled" : 0, - "diskBytesSpilled" : 0 + "diskBytesSpilled" : 0, + "isBlacklistedForStage" : false } }, "killedTasksSummary" : { } http://git-wip-us.apache.org/repos/asf/spark/blob/0ec95bb7/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json -- diff --git a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json index 9cdcef0..963f010 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json @@ -465,7 +465,8 @@ "shuffleWrite" : 0, "shuffleWriteRecords" : 0, "memoryBytesSpilled" : 0, - "diskBytesSpilled" : 0 + "diskBytesSpilled" : 0, + "isBlacklistedForStage" : false } }, "killedTasksSummary" : { } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/5] spark git commit: [SPARK-22577][CORE] executor page blacklist status should update with TaskSet level blacklisting
Repository: spark Updated Branches: refs/heads/master de36f65d3 -> 0ec95bb7d http://git-wip-us.apache.org/repos/asf/spark/blob/0ec95bb7/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 7aa60f2..87f12f3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -156,6 +156,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers "applications/local-1426533911241/1/stages/0/0/taskList", "stage task list from multi-attempt app json(2)" -> "applications/local-1426533911241/2/stages/0/0/taskList", +"blacklisting for stage" -> "applications/app-20180109111548-/stages/0/0", +"blacklisting node for stage" -> "applications/application_1516285256255_0012/stages/0/0", "rdd list storage json" -> "applications/local-1422981780767/storage/rdd", "executor node blacklisting" -> "applications/app-20161116163331-/executors", http://git-wip-us.apache.org/repos/asf/spark/blob/0ec95bb7/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index cd1b7a9..afebcdd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -92,7 +92,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M } def createTaskSetBlacklist(stageId: Int = 0): TaskSetBlacklist = { -new TaskSetBlacklist(conf, stageId, clock) +new TaskSetBlacklist(listenerBusMock, conf, stageId, stageAttemptId = 0, clock = clock) } test("executors can be blacklisted with only a few failures per stage") { http://git-wip-us.apache.org/repos/asf/spark/blob/0ec95bb7/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala index 18981d5..6e2709d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala @@ -16,18 +16,32 @@ */ package org.apache.spark.scheduler +import org.mockito.Matchers.isA +import org.mockito.Mockito.{never, verify} +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config -import org.apache.spark.util.{ManualClock, SystemClock} +import org.apache.spark.util.ManualClock + +class TaskSetBlacklistSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar { -class TaskSetBlacklistSuite extends SparkFunSuite { + private var listenerBusMock: LiveListenerBus = _ + + override def beforeEach(): Unit = { +listenerBusMock = mock[LiveListenerBus] +super.beforeEach() + } test("Blacklisting tasks, executors, and nodes") { val conf = new SparkConf().setAppName("test").setMaster("local") .set(config.BLACKLIST_ENABLED.key, "true") val clock = new ManualClock +val attemptId = 0 +val taskSetBlacklist = new TaskSetBlacklist( + listenerBusMock, conf, stageId = 0, stageAttemptId = attemptId, clock = clock) -val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, clock = clock) clock.setTime(0) // We will mark task 0 & 1 failed on both executor 1 & 2. // We should blacklist all executors on that host, for all tasks for the stage. Note the API @@ -46,27 +60,53 @@ class TaskSetBlacklistSuite extends SparkFunSuite { val shouldBeBlacklisted = (executor == "exec1" && index == 0) assert(taskSetBlacklist.isExecutorBlacklistedForTask(executor, index) === shouldBeBlacklisted) } + assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerExecutorBlacklistedForStage])) + assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerNodeBlacklistedForStage])) // Mark task 1 failed on exec1 -- this pushes the executor into the blacklist taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "exec1", index = 1, fail
[5/5] spark git commit: [SPARK-22577][CORE] executor page blacklist status should update with TaskSet level blacklisting
[SPARK-22577][CORE] executor page blacklist status should update with TaskSet level blacklisting ## What changes were proposed in this pull request? In this PR stage blacklisting is propagated to UI by introducing a new Spark listener event (SparkListenerExecutorBlacklistedForStage) which indicates the executor is blacklisted for a stage. Either because of the number of failures are exceeded a limit given for an executor (spark.blacklist.stage.maxFailedTasksPerExecutor) or because of the whole node is blacklisted for a stage (spark.blacklist.stage.maxFailedExecutorsPerNode). In case of the node is blacklisting all executors will listed as blacklisted for the stage. Blacklisting state for a selected stage can be seen "Aggregated Metrics by Executor" table's blacklisting column, where after this change three possible labels could be found: - "for application": when the executor is blacklisted for the application (see the configuration spark.blacklist.application.maxFailedTasksPerExecutor for details) - "for stage": when the executor is **only** blacklisted for the stage - "false" : when the executor is not blacklisted at all ## How was this patch tested? It is tested both manually and with unit tests. Unit tests - HistoryServerSuite - TaskSetBlacklistSuite - AppStatusListenerSuite Manual test for executor blacklisting Running Spark as a local cluster: ``` $ bin/spark-shell --master "local-cluster[2,1,1024]" --conf "spark.blacklist.enabled=true" --conf "spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf "spark.blacklist.application.maxFailedTasksPerExecutor=10" --conf "spark.eventLog.enabled=true" ``` Executing: ``` scala import org.apache.spark.SparkEnv sc.parallelize(1 to 10, 10).map { x => if (SparkEnv.get.executorId == "0") throw new RuntimeException("Bad executor") else (x % 3, x) }.reduceByKey((a, b) => a + b).collect() ``` To see result check the "Aggregated Metrics by Executor" section at the bottom of picture: ![UI screenshot for stage level blacklisting executor](https://issues.apache.org/jira/secure/attachment/12905283/stage_blacklisting.png) Manual test for node blacklisting Running Spark as on a cluster: ``` bash ./bin/spark-shell --master yarn --deploy-mode client --executor-memory=2G --num-executors=8 --conf "spark.blacklist.enabled=true" --conf "spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf "spark.blacklist.stage.maxFailedExecutorsPerNode=1" --conf "spark.blacklist.application.maxFailedTasksPerExecutor=10" --conf "spark.eventLog.enabled=true" ``` And the job was: ``` scala import org.apache.spark.SparkEnv sc.parallelize(1 to 1, 10).map { x => if (SparkEnv.get.executorId.toInt >= 4) throw new RuntimeException("Bad executor") else (x % 3, x) }.reduceByKey((a, b) => a + b).collect() ``` The result is: ![UI screenshot for stage level node blacklisting](https://issues.apache.org/jira/secure/attachment/12906833/node_blacklisting_for_stage.png) Here you can see apiros3.gce.test.com was node blacklisted for the stage because of failures on executor 4 and 5. As expected executor 3 is also blacklisted even it has no failures itself but sharing the node with 4 and 5. Author: âattilapirosâ Author: Attila Zsolt Piros <2017933+attilapi...@users.noreply.github.com> Closes #20203 from attilapiros/SPARK-22577. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ec95bb7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ec95bb7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ec95bb7 Branch: refs/heads/master Commit: 0ec95bb7df775be33fc8983f6c0983a67032d2c8 Parents: de36f65 Author: âattilapirosâ Authored: Wed Jan 24 11:34:59 2018 -0600 Committer: Imran Rashid Committed: Wed Jan 24 11:34:59 2018 -0600 -- .../org/apache/spark/SparkFirehoseListener.java | 12 + .../spark/scheduler/EventLoggingListener.scala | 9 + .../apache/spark/scheduler/SparkListener.scala | 35 + .../spark/scheduler/SparkListenerBus.scala | 4 + .../spark/scheduler/TaskSetBlacklist.scala | 19 +- .../apache/spark/scheduler/TaskSetManager.scala | 2 +- .../apache/spark/status/AppStatusListener.scala | 25 + .../org/apache/spark/status/LiveEntity.scala| 4 +- .../org/apache/spark/status/api/v1/api.scala| 3 +- .../apache/spark/ui/jobs/ExecutorTable.scala| 10 +- .../application_list_json_expectation.json | 70 +- .../blacklisting_for_stage_expectation.json | 639 +++ ...blacklisting_node_for_stage_expectation.json | 783 +++ .../completed_app_list_json_expectation.json| 71 +- .../limit_app_list_json_expectation.json| 54 +- .../minDate_app_list_json_expectation.json | 62 +- .../minEndDate_app_list_json_expectation.json | 34 +- .../one_stage_attempt_json_expectation.json | 3 +-
[3/5] spark git commit: [SPARK-22577][CORE] executor page blacklist status should update with TaskSet level blacklisting
http://git-wip-us.apache.org/repos/asf/spark/blob/0ec95bb7/core/src/test/resources/spark-events/app-20180109111548- -- diff --git a/core/src/test/resources/spark-events/app-20180109111548- b/core/src/test/resources/spark-events/app-20180109111548- new file mode 100755 index 000..50893d3 --- /dev/null +++ b/core/src/test/resources/spark-events/app-20180109111548- @@ -0,0 +1,59 @@ +{"Event":"SparkListenerLogStart","Spark Version":"2.3.0-SNAPSHOT"} +{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre","Java Version":"1.8.0_152 (Oracle Corporation)","Scala Version":"version 2.11.8"},"Spark Properties":{"spark.blacklist.enabled":"true","spark.driver.host":"172.30.65.138","spark.eventLog.enabled":"true","spark.driver.port":"64273","spark.repl.class.uri":"spark://172.30.65.138:64273/classes","spark.jars":"","spark.repl.class.outputDir":"/private/var/folders/9g/gf583nd1765cvfgb_lsvwgp0gp/T/spark-811c1b49-eb66-4bfb-91ae-33b45efa269d/repl-c4438f51-ee23-41ed-8e04-71496e2f40f5","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.ui.showConsoleProgress":"true","spark.blacklist.stage.maxFailedTasksPerExecutor":"1","spark.executor.id":"driver","spark.submit.deployMode":"client","spark.master":"local-cluster[2,1,1024]","spark.home":"*(redacted)","spark.sql.catalogImplementation":"in-memory","spark.blacklist.application.maxF ailedTasksPerExecutor":"10","spark.app.id":"app-20180109111548-"},"System Properties":{"java.io.tmpdir":"/var/folders/9g/gf583nd1765cvfgb_lsvwgp0gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"*(redacted)","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre/lib","user.dir":"*(redacted)","java.library.path":"*(redacted)","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.152-b16","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre/l ib/endorsed","java.runtime.version":"1.8.0_152-b16","java.vm.info":"mixed mode","java.ext.dirs":"*(redacted)","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre/classes","file .encoding":"UTF-8","user.timezone":"*(redacted)","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.12.6","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"*(redacted)","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"*(redacted)","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"*(redacted)","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"*(redacted)","user.name":"*(redacted)","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --master local-cluster[2,1,1024] --conf spark.blacklist.stage.maxF ailedTasksPerExecutor=1 --conf spark.blacklist.enabled=true --conf spark.blacklist.application.maxFailedTasksPerExecutor=10 --conf spark.eventLog.enabled=true --class org.apache.spark.repl.Main --name Spark shell spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/jre","java.version":"1.8.0_152","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/Users/attilapiros/github/spark/a
svn commit: r24412 - in /dev/spark/2.4.0-SNAPSHOT-2018_01_24_08_01-de36f65-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 24 16:17:53 2018 New Revision: 24412 Log: Apache Spark 2.4.0-SNAPSHOT-2018_01_24_08_01-de36f65 docs [This commit notification would consist of 1442 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24411 - in /dev/spark/2.3.1-SNAPSHOT-2018_01_24_06_01-17317c8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 24 14:16:14 2018 New Revision: 24411 Log: Apache Spark 2.3.1-SNAPSHOT-2018_01_24_06_01-17317c8 docs [This commit notification would consist of 1442 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23148][SQL] Allow pathnames with special characters for CSV / JSON / text
Repository: spark Updated Branches: refs/heads/branch-2.3 84a189a34 -> 17317c8fb [SPARK-23148][SQL] Allow pathnames with special characters for CSV / JSON / text â¦JSON / text ## What changes were proposed in this pull request? Fix for JSON and CSV data sources when file names include characters that would be changed by URL encoding. ## How was this patch tested? New unit tests for JSON, CSV and text suites Author: Henry Robinson Closes #20355 from henryr/spark-23148. (cherry picked from commit de36f65d3a819c00d6bf6979deef46c824203669) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17317c8f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17317c8f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17317c8f Branch: refs/heads/branch-2.3 Commit: 17317c8fb99715836fcebc39ffb04648ab7fb762 Parents: 84a189a Author: Henry Robinson Authored: Wed Jan 24 21:19:09 2018 +0900 Committer: hyukjinkwon Committed: Wed Jan 24 21:19:23 2018 +0900 -- .../sql/execution/datasources/CodecStreams.scala | 6 +++--- .../execution/datasources/csv/CSVDataSource.scala | 11 ++- .../datasources/json/JsonDataSource.scala | 10 ++ .../spark/sql/FileBasedDataSourceSuite.scala | 18 -- 4 files changed, 31 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/17317c8f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala index 54549f6..c0df6c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala @@ -45,11 +45,11 @@ object CodecStreams { } /** - * Creates an input stream from the string path and add a closure for the input stream to be + * Creates an input stream from the given path and add a closure for the input stream to be * closed on task completion. */ - def createInputStreamWithCloseResource(config: Configuration, path: String): InputStream = { -val inputStream = createInputStream(config, new Path(path)) + def createInputStreamWithCloseResource(config: Configuration, path: Path): InputStream = { +val inputStream = createInputStream(config, path) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => inputStream.close())) inputStream } http://git-wip-us.apache.org/repos/asf/spark/blob/17317c8f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 2031381..4870d75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.execution.datasources.csv +import java.net.URI import java.nio.charset.{Charset, StandardCharsets} import com.univocity.parsers.csv.CsvParser import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.Job @@ -32,7 +33,6 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.types.StructType @@ -206,7 +206,7 @@ object MultiLineCSVDataSource extends CSVDataSource { parser: UnivocityParser, schema: StructType): Iterator[InternalRow] = { UnivocityParser.parseStream( - CodecStreams.createInputStreamWithCloseResource(conf, file.filePath), + CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))), parser.options.headerFlag, parser, schema) @@ -218,8 +218,9 @@ object MultiLineCSVDataSource extends CSVDataSource { parsedOpti
spark git commit: [SPARK-23148][SQL] Allow pathnames with special characters for CSV / JSON / text
Repository: spark Updated Branches: refs/heads/master 7af1a325d -> de36f65d3 [SPARK-23148][SQL] Allow pathnames with special characters for CSV / JSON / text â¦JSON / text ## What changes were proposed in this pull request? Fix for JSON and CSV data sources when file names include characters that would be changed by URL encoding. ## How was this patch tested? New unit tests for JSON, CSV and text suites Author: Henry Robinson Closes #20355 from henryr/spark-23148. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de36f65d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de36f65d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de36f65d Branch: refs/heads/master Commit: de36f65d3a819c00d6bf6979deef46c824203669 Parents: 7af1a32 Author: Henry Robinson Authored: Wed Jan 24 21:19:09 2018 +0900 Committer: hyukjinkwon Committed: Wed Jan 24 21:19:09 2018 +0900 -- .../sql/execution/datasources/CodecStreams.scala | 6 +++--- .../execution/datasources/csv/CSVDataSource.scala | 11 ++- .../datasources/json/JsonDataSource.scala | 10 ++ .../spark/sql/FileBasedDataSourceSuite.scala | 18 -- 4 files changed, 31 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de36f65d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala index 54549f6..c0df6c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala @@ -45,11 +45,11 @@ object CodecStreams { } /** - * Creates an input stream from the string path and add a closure for the input stream to be + * Creates an input stream from the given path and add a closure for the input stream to be * closed on task completion. */ - def createInputStreamWithCloseResource(config: Configuration, path: String): InputStream = { -val inputStream = createInputStream(config, new Path(path)) + def createInputStreamWithCloseResource(config: Configuration, path: Path): InputStream = { +val inputStream = createInputStream(config, path) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => inputStream.close())) inputStream } http://git-wip-us.apache.org/repos/asf/spark/blob/de36f65d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 2031381..4870d75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.execution.datasources.csv +import java.net.URI import java.nio.charset.{Charset, StandardCharsets} import com.univocity.parsers.csv.CsvParser import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.Job @@ -32,7 +33,6 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.types.StructType @@ -206,7 +206,7 @@ object MultiLineCSVDataSource extends CSVDataSource { parser: UnivocityParser, schema: StructType): Iterator[InternalRow] = { UnivocityParser.parseStream( - CodecStreams.createInputStreamWithCloseResource(conf, file.filePath), + CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))), parser.options.headerFlag, parser, schema) @@ -218,8 +218,9 @@ object MultiLineCSVDataSource extends CSVDataSource { parsedOptions: CSVOptions): StructType = { val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
spark git commit: [SPARK-23174][BUILD][PYTHON] python code style checker update
Repository: spark Updated Branches: refs/heads/master 4e7b49041 -> 7af1a325d [SPARK-23174][BUILD][PYTHON] python code style checker update ## What changes were proposed in this pull request? Referencing latest python code style checking from PyPi/pycodestyle Removed pending TODO For now, in tox.ini excluded the additional style error discovered on existing python due to latest style checker (will fallback on review comment to finalize exclusion or fix py) Any further code styling requirement needs to be part of pycodestyle, not in SPARK. ## How was this patch tested? ./dev/run-tests Author: Rekha Joshi Author: rjoshi2 Closes #20338 from rekhajoshm/SPARK-11222. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7af1a325 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7af1a325 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7af1a325 Branch: refs/heads/master Commit: 7af1a325da57daa2e25c713472a320f4ccb43d71 Parents: 4e7b490 Author: Rekha Joshi Authored: Wed Jan 24 21:13:47 2018 +0900 Committer: hyukjinkwon Committed: Wed Jan 24 21:13:47 2018 +0900 -- dev/lint-python | 37 ++--- dev/run-tests.py | 5 - dev/tox.ini | 4 ++-- 3 files changed, 24 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7af1a325/dev/lint-python -- diff --git a/dev/lint-python b/dev/lint-python index df8df03..e069caf 100755 --- a/dev/lint-python +++ b/dev/lint-python @@ -21,7 +21,7 @@ SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )" SPARK_ROOT_DIR="$(dirname "$SCRIPT_DIR")" # Exclude auto-generated configuration file. PATHS_TO_CHECK="$( cd "$SPARK_ROOT_DIR" && find . -name "*.py" )" -PEP8_REPORT_PATH="$SPARK_ROOT_DIR/dev/pep8-report.txt" +PYCODESTYLE_REPORT_PATH="$SPARK_ROOT_DIR/dev/pycodestyle-report.txt" PYLINT_REPORT_PATH="$SPARK_ROOT_DIR/dev/pylint-report.txt" PYLINT_INSTALL_INFO="$SPARK_ROOT_DIR/dev/pylint-info.txt" SPHINXBUILD=${SPHINXBUILD:=sphinx-build} @@ -30,23 +30,22 @@ SPHINX_REPORT_PATH="$SPARK_ROOT_DIR/dev/sphinx-report.txt" cd "$SPARK_ROOT_DIR" # compileall: https://docs.python.org/2/library/compileall.html -python -B -m compileall -q -l $PATHS_TO_CHECK > "$PEP8_REPORT_PATH" +python -B -m compileall -q -l $PATHS_TO_CHECK > "$PYCODESTYLE_REPORT_PATH" compile_status="${PIPESTATUS[0]}" -# Get pep8 at runtime so that we don't rely on it being installed on the build server. +# Get pycodestyle at runtime so that we don't rely on it being installed on the build server. #+ See: https://github.com/apache/spark/pull/1744#issuecomment-50982162 -#+ TODOs: -#+ - Download pep8 from PyPI. It's more "official". -PEP8_VERSION="1.7.0" -PEP8_SCRIPT_PATH="$SPARK_ROOT_DIR/dev/pep8-$PEP8_VERSION.py" -PEP8_SCRIPT_REMOTE_PATH="https://raw.githubusercontent.com/jcrocholl/pep8/$PEP8_VERSION/pep8.py"; +# Updated to latest official version for pep8. pep8 is formally renamed to pycodestyle. +PYCODESTYLE_VERSION="2.3.1" +PYCODESTYLE_SCRIPT_PATH="$SPARK_ROOT_DIR/dev/pycodestyle-$PYCODESTYLE_VERSION.py" +PYCODESTYLE_SCRIPT_REMOTE_PATH="https://raw.githubusercontent.com/PyCQA/pycodestyle/$PYCODESTYLE_VERSION/pycodestyle.py"; -if [ ! -e "$PEP8_SCRIPT_PATH" ]; then -curl --silent -o "$PEP8_SCRIPT_PATH" "$PEP8_SCRIPT_REMOTE_PATH" +if [ ! -e "$PYCODESTYLE_SCRIPT_PATH" ]; then +curl --silent -o "$PYCODESTYLE_SCRIPT_PATH" "$PYCODESTYLE_SCRIPT_REMOTE_PATH" curl_status="$?" if [ "$curl_status" -ne 0 ]; then -echo "Failed to download pep8.py from \"$PEP8_SCRIPT_REMOTE_PATH\"." +echo "Failed to download pycodestyle.py from \"$PYCODESTYLE_SCRIPT_REMOTE_PATH\"." exit "$curl_status" fi fi @@ -64,23 +63,23 @@ export "PATH=$PYTHONPATH:$PATH" #+ first, but we do so so that the check status can #+ be output before the report, like with the #+ scalastyle and RAT checks. -python "$PEP8_SCRIPT_PATH" --config=dev/tox.ini $PATHS_TO_CHECK >> "$PEP8_REPORT_PATH" -pep8_status="${PIPESTATUS[0]}" +python "$PYCODESTYLE_SCRIPT_PATH" --config=dev/tox.ini $PATHS_TO_CHECK >> "$PYCODESTYLE_REPORT_PATH" +pycodestyle_status="${PIPESTATUS[0]}" -if [ "$compile_status" -eq 0 -a "$pep8_status" -eq 0 ]; then +if [ "$compile_status" -eq 0 -a "$pycodestyle_status" -eq 0 ]; then lint_status=0 else lint_status=1 fi if [ "$lint_status" -ne 0 ]; then -echo "PEP8 checks failed." -cat "$PEP8_REPORT_PATH" -rm "$PEP8_REPORT_PATH" +echo "PYCODESTYLE checks failed." +cat "$PYCODESTYLE_REPORT_PATH" +rm "$PYCODESTYLE_REPORT_PATH" exit "$lint_status" else -echo "PEP8 checks passed." -rm "$PEP8_REPORT_PATH" +echo "pycodestyle checks passed." +rm "$PYCODESTYLE_REPORT_PATH" fi # Check th
spark git commit: [SPARK-23177][SQL][PYSPARK][BACKPORT-2.3] Extract zero-parameter UDFs from aggregate
Repository: spark Updated Branches: refs/heads/branch-2.3 d656be74b -> 84a189a34 [SPARK-23177][SQL][PYSPARK][BACKPORT-2.3] Extract zero-parameter UDFs from aggregate ## What changes were proposed in this pull request? We extract Python UDFs in logical aggregate which depends on aggregate expression or grouping key in ExtractPythonUDFFromAggregate rule. But Python UDFs which don't depend on above expressions should also be extracted to avoid the issue reported in the JIRA. A small code snippet to reproduce that issue looks like: ```python import pyspark.sql.functions as f df = spark.createDataFrame([(1,2), (3,4)]) f_udf = f.udf(lambda: str("const_str")) df2 = df.distinct().withColumn("a", f_udf()) df2.show() ``` Error exception is raised as: ``` : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#50 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90) at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:514) at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:513) ``` This exception raises because `HashAggregateExec` tries to bind the aliased Python UDF expression (e.g., `pythonUDF0#50 AS a#44`) to grouping key. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh Closes #20379 from viirya/SPARK-23177-backport-2.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84a189a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84a189a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84a189a3 Branch: refs/heads/branch-2.3 Commit: 84a189a3429c64eeabc7b4e8fc0488ec16742002 Parents: d656be7 Author: Liang-Chi Hsieh Authored: Wed Jan 24 20:29:47 2018 +0900 Committer: hyukjinkwon Committed: Wed Jan 24 20:29:47 2018 +0900 -- python/pyspark/sql/tests.py | 8 .../spark/sql/execution/python/ExtractPythonUDFs.scala | 5 +++-- 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/84a189a3/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 4fee2ec..fbb18c4 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1100,6 +1100,14 @@ class SQLTests(ReusedSQLTestCase): rows = [r[0] for r in df.selectExpr("udf(id)").take(2)] self.assertEqual(rows, [None, PythonOnlyPoint(1, 1)]) +def test_nonparam_udf_with_aggregate(self): +import pyspark.sql.functions as f + +df = self.spark.createDataFrame([(1, 2), (1, 2)]) +f_udf = f.udf(lambda: "const_str") +rows = df.distinct().withColumn("a", f_udf()).collect() +self.assertEqual(rows, [Row(_1=1, _2=2, a=u'const_str')]) + def test_infer_schema_with_udt(self): from pyspark.sql.tests import ExamplePoint, ExamplePointUDT row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) http://git-wip-us.apache.org/repos/asf/spark/blob/84a189a3/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala -- diff --git a/sql/core/src/main/scala/org/a
svn commit: r24409 - in /dev/spark/2.3.1-SNAPSHOT-2018_01_24_02_01-d656be7-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 24 10:16:19 2018 New Revision: 24409 Log: Apache Spark 2.3.1-SNAPSHOT-2018_01_24_02_01-d656be7 docs [This commit notification would consist of 1442 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r24406 - in /dev/spark/2.4.0-SNAPSHOT-2018_01_24_00_01-4e7b490-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 24 08:17:11 2018 New Revision: 24406 Log: Apache Spark 2.4.0-SNAPSHOT-2018_01_24_00_01-4e7b490 docs [This commit notification would consist of 1442 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org