spark git commit: [SPARK-26079][SQL] Ensure listener event delivery in StreamingQueryListenersConfSuite.
Repository: spark Updated Branches: refs/heads/branch-2.4 c23b801d3 -> 33f55d445 [SPARK-26079][SQL] Ensure listener event delivery in StreamingQueryListenersConfSuite. Events are dispatched on a separate thread, so need to wait for them to be actually delivered before checking that the listener got them. Closes #23050 from vanzin/SPARK-26079. Authored-by: Marcelo Vanzin Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33f55d44 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33f55d44 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33f55d44 Branch: refs/heads/branch-2.4 Commit: 33f55d445c1d148d8cc8c210cae67d8f9f717dc1 Parents: c23b801 Author: Marcelo Vanzin Authored: Sat Nov 17 15:07:20 2018 +0800 Committer: hyukjinkwon Committed: Sat Nov 17 15:08:07 2018 +0800 -- .../spark/sql/streaming/StreamingQueryListenersConfSuite.scala| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/33f55d44/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala index 1aaf8a9..ddbc175 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala @@ -30,7 +30,6 @@ class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter { import testImplicits._ - override protected def sparkConf: SparkConf = super.sparkConf.set("spark.sql.streaming.streamingQueryListeners", "org.apache.spark.sql.streaming.TestListener") @@ -41,6 +40,8 @@ class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter { StopStream ) +spark.sparkContext.listenerBus.waitUntilEmpty(5000) + assert(TestListener.queryStartedEvent != null) assert(TestListener.queryTerminatedEvent != null) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26079][SQL] Ensure listener event delivery in StreamingQueryListenersConfSuite.
Repository: spark Updated Branches: refs/heads/master d2792046a -> 23cd0e6e9 [SPARK-26079][SQL] Ensure listener event delivery in StreamingQueryListenersConfSuite. Events are dispatched on a separate thread, so need to wait for them to be actually delivered before checking that the listener got them. Closes #23050 from vanzin/SPARK-26079. Authored-by: Marcelo Vanzin Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23cd0e6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23cd0e6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23cd0e6e Branch: refs/heads/master Commit: 23cd0e6e9e20a224a71859c158437e0a31982259 Parents: d279204 Author: Marcelo Vanzin Authored: Sat Nov 17 15:07:20 2018 +0800 Committer: hyukjinkwon Committed: Sat Nov 17 15:07:20 2018 +0800 -- .../spark/sql/streaming/StreamingQueryListenersConfSuite.scala| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/23cd0e6e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala index 1aaf8a9..ddbc175 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala @@ -30,7 +30,6 @@ class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter { import testImplicits._ - override protected def sparkConf: SparkConf = super.sparkConf.set("spark.sql.streaming.streamingQueryListeners", "org.apache.spark.sql.streaming.TestListener") @@ -41,6 +40,8 @@ class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter { StopStream ) +spark.sparkContext.listenerBus.waitUntilEmpty(5000) + assert(TestListener.queryStartedEvent != null) assert(TestListener.queryTerminatedEvent != null) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r30939 - in /dev/spark/3.0.0-SNAPSHOT-2018_11_16_18_25-d279204-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Nov 17 02:38:20 2018 New Revision: 30939 Log: Apache Spark 3.0.0-SNAPSHOT-2018_11_16_18_25-d279204 docs [This commit notification would consist of 1755 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r30938 - in /dev/spark/2.4.1-SNAPSHOT-2018_11_16_16_22-c23b801-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Nov 17 00:37:32 2018 New Revision: 30938 Log: Apache Spark 2.4.1-SNAPSHOT-2018_11_16_16_22-c23b801 docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26095][BUILD] Disable parallelization in make-distibution.sh.
Repository: spark Updated Branches: refs/heads/master 058c4602b -> d2792046a [SPARK-26095][BUILD] Disable parallelization in make-distibution.sh. It makes the build slower, but at least it doesn't hang. Seems that maven-shade-plugin has some issue with parallelization. Closes #23061 from vanzin/SPARK-26095. Authored-by: Marcelo Vanzin Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2792046 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2792046 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2792046 Branch: refs/heads/master Commit: d2792046a1b10a07b65fc30be573983f1237e450 Parents: 058c460 Author: Marcelo Vanzin Authored: Fri Nov 16 15:57:38 2018 -0800 Committer: Marcelo Vanzin Committed: Fri Nov 16 15:57:38 2018 -0800 -- dev/make-distribution.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d2792046/dev/make-distribution.sh -- diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index 84f4ae9..a550af9 100755 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -165,7 +165,7 @@ export MAVEN_OPTS="${MAVEN_OPTS:--Xmx2g -XX:ReservedCodeCacheSize=512m}" # Store the command as an array because $MVN variable might have spaces in it. # Normal quoting tricks don't work. # See: http://mywiki.wooledge.org/BashFAQ/050 -BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@) +BUILD_COMMAND=("$MVN" clean package -DskipTests $@) # Actually build the jar echo -e "\nBuilding with..." - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file
Repository: spark Updated Branches: refs/heads/branch-2.4 77c0629cb -> c23b801d3 [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file ## What changes were proposed in this pull request? Use CheckpointFileManager to write the streaming `metadata` file so that the `metadata` file will never be a partial file. ## How was this patch tested? Jenkins Closes #23060 from zsxwing/SPARK-26092. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu (cherry picked from commit 058c4602b000b24deb764a810ef8b43c41fe63ae) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c23b801d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c23b801d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c23b801d Branch: refs/heads/branch-2.4 Commit: c23b801d3c87b12e729b98910833b441db05bd45 Parents: 77c0629 Author: Shixiong Zhu Authored: Fri Nov 16 15:43:27 2018 -0800 Committer: Shixiong Zhu Committed: Fri Nov 16 15:43:44 2018 -0800 -- .../streaming/CheckpointFileManager.scala | 2 +- .../execution/streaming/StreamExecution.scala | 1 + .../execution/streaming/StreamMetadata.scala| 23 ++-- 3 files changed, 18 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 606ba25..b3e4240 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -56,7 +56,7 @@ trait CheckpointFileManager { * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to *overwrite the file if it already exists. It should not throw *any exception if the file exists. However, if false, then the - *implementation must not overwrite if the file alraedy exists and + *implementation must not overwrite if the file already exists and *must throw `FileAlreadyExistsException` in that case. */ def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index f6c60c1..de33844 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -87,6 +87,7 @@ abstract class StreamExecution( val resolvedCheckpointRoot = { val checkpointPath = new Path(checkpointRoot) val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) +fs.mkdirs(checkpointPath) checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString } http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala index 0bc54ea..516afbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming import java.io.{InputStreamReader, OutputStreamWriter} import java.nio.charset.StandardCharsets +import java.util.ConcurrentModificationException import scala.util.control.NonFatal import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path} +import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path} import org.json4s.NoTypeHints import
spark git commit: [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file
Repository: spark Updated Branches: refs/heads/master 99cbc51b3 -> 058c4602b [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file ## What changes were proposed in this pull request? Use CheckpointFileManager to write the streaming `metadata` file so that the `metadata` file will never be a partial file. ## How was this patch tested? Jenkins Closes #23060 from zsxwing/SPARK-26092. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/058c4602 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/058c4602 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/058c4602 Branch: refs/heads/master Commit: 058c4602b000b24deb764a810ef8b43c41fe63ae Parents: 99cbc51 Author: Shixiong Zhu Authored: Fri Nov 16 15:43:27 2018 -0800 Committer: Shixiong Zhu Committed: Fri Nov 16 15:43:27 2018 -0800 -- .../streaming/CheckpointFileManager.scala | 2 +- .../execution/streaming/StreamExecution.scala | 1 + .../execution/streaming/StreamMetadata.scala| 23 ++-- 3 files changed, 18 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 606ba25..b3e4240 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -56,7 +56,7 @@ trait CheckpointFileManager { * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to *overwrite the file if it already exists. It should not throw *any exception if the file exists. However, if false, then the - *implementation must not overwrite if the file alraedy exists and + *implementation must not overwrite if the file already exists and *must throw `FileAlreadyExistsException` in that case. */ def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 631a6eb..89b4f40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -88,6 +88,7 @@ abstract class StreamExecution( val resolvedCheckpointRoot = { val checkpointPath = new Path(checkpointRoot) val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) +fs.mkdirs(checkpointPath) checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString } http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala index 0bc54ea..516afbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming import java.io.{InputStreamReader, OutputStreamWriter} import java.nio.charset.StandardCharsets +import java.util.ConcurrentModificationException import scala.util.control.NonFatal import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path} +import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging +import
svn commit: r30931 - in /dev/spark/2.4.1-SNAPSHOT-2018_11_16_12_15-77c0629-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Nov 16 20:30:12 2018 New Revision: 30931 Log: Apache Spark 2.4.1-SNAPSHOT-2018_11_16_12_15-77c0629 docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r30929 - in /dev/spark/3.0.0-SNAPSHOT-2018_11_16_10_10-99cbc51-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Nov 16 18:22:40 2018 New Revision: 30929 Log: Apache Spark 3.0.0-SNAPSHOT-2018_11_16_10_10-99cbc51 docs [This commit notification would consist of 1755 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26069][TESTS] Fix flaky test: RpcIntegrationSuite.sendRpcWithStreamFailures
Repository: spark Updated Branches: refs/heads/branch-2.4 be42bfe5c -> 77c0629cb [SPARK-26069][TESTS] Fix flaky test: RpcIntegrationSuite.sendRpcWithStreamFailures ## What changes were proposed in this pull request? The test failure is because `assertErrorAndClosed` misses one possible error message: `java.nio.channels.ClosedChannelException`. This happens when the second `uploadStream` is called after the channel has been closed. This can be reproduced by adding `Thread.sleep(1000)` below this line: https://github.com/apache/spark/blob/03306a6df39c9fd6cb581401c13c4dfc6bbd632e/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java#L217 This PR fixes the above issue and also improves the test failure messages of `assertErrorAndClosed`. ## How was this patch tested? Jenkins Closes #23041 from zsxwing/SPARK-26069. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu (cherry picked from commit 99cbc51b3250c07a3e8cc95c9b74e9d1725bac77) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77c0629c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77c0629c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77c0629c Branch: refs/heads/branch-2.4 Commit: 77c0629cbac6046cc3f2ea1025c43d86af344d62 Parents: be42bfe Author: Shixiong Zhu Authored: Fri Nov 16 09:51:41 2018 -0800 Committer: Shixiong Zhu Committed: Fri Nov 16 09:52:03 2018 -0800 -- .../apache/spark/network/RpcIntegrationSuite.java | 18 -- 1 file changed, 12 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/77c0629c/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java -- diff --git a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java index 1f4d75c..45f4a18 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java @@ -371,7 +371,10 @@ public class RpcIntegrationSuite { private void assertErrorAndClosed(RpcResult result, String expectedError) { assertTrue("unexpected success: " + result.successMessages, result.successMessages.isEmpty()); -// we expect 1 additional error, which contains *either* "closed" or "Connection reset" +// we expect 1 additional error, which should contain one of the follow messages: +// - "closed" +// - "Connection reset" +// - "java.nio.channels.ClosedChannelException" Set errors = result.errorMessages; assertEquals("Expected 2 errors, got " + errors.size() + "errors: " + errors, 2, errors.size()); @@ -379,15 +382,18 @@ public class RpcIntegrationSuite { Set containsAndClosed = Sets.newHashSet(expectedError); containsAndClosed.add("closed"); containsAndClosed.add("Connection reset"); +containsAndClosed.add("java.nio.channels.ClosedChannelException"); Pair, Set> r = checkErrorsContain(errors, containsAndClosed); -Set errorsNotFound = r.getRight(); -assertEquals(1, errorsNotFound.size()); -String err = errorsNotFound.iterator().next(); -assertTrue(err.equals("closed") || err.equals("Connection reset")); +assertTrue("Got a non-empty set " + r.getLeft(), r.getLeft().isEmpty()); -assertTrue(r.getLeft().isEmpty()); +Set errorsNotFound = r.getRight(); +assertEquals( +"The size of " + errorsNotFound.toString() + " was not 2", 2, errorsNotFound.size()); +for (String err: errorsNotFound) { + assertTrue("Found a wrong error " + err, containsAndClosed.contains(err)); +} } private Pair, Set> checkErrorsContain( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26069][TESTS] Fix flaky test: RpcIntegrationSuite.sendRpcWithStreamFailures
Repository: spark Updated Branches: refs/heads/master a2fc48c28 -> 99cbc51b3 [SPARK-26069][TESTS] Fix flaky test: RpcIntegrationSuite.sendRpcWithStreamFailures ## What changes were proposed in this pull request? The test failure is because `assertErrorAndClosed` misses one possible error message: `java.nio.channels.ClosedChannelException`. This happens when the second `uploadStream` is called after the channel has been closed. This can be reproduced by adding `Thread.sleep(1000)` below this line: https://github.com/apache/spark/blob/03306a6df39c9fd6cb581401c13c4dfc6bbd632e/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java#L217 This PR fixes the above issue and also improves the test failure messages of `assertErrorAndClosed`. ## How was this patch tested? Jenkins Closes #23041 from zsxwing/SPARK-26069. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99cbc51b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99cbc51b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99cbc51b Branch: refs/heads/master Commit: 99cbc51b3250c07a3e8cc95c9b74e9d1725bac77 Parents: a2fc48c Author: Shixiong Zhu Authored: Fri Nov 16 09:51:41 2018 -0800 Committer: Shixiong Zhu Committed: Fri Nov 16 09:51:41 2018 -0800 -- .../apache/spark/network/RpcIntegrationSuite.java | 18 -- 1 file changed, 12 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/99cbc51b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java -- diff --git a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java index 1f4d75c..45f4a18 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java @@ -371,7 +371,10 @@ public class RpcIntegrationSuite { private void assertErrorAndClosed(RpcResult result, String expectedError) { assertTrue("unexpected success: " + result.successMessages, result.successMessages.isEmpty()); -// we expect 1 additional error, which contains *either* "closed" or "Connection reset" +// we expect 1 additional error, which should contain one of the follow messages: +// - "closed" +// - "Connection reset" +// - "java.nio.channels.ClosedChannelException" Set errors = result.errorMessages; assertEquals("Expected 2 errors, got " + errors.size() + "errors: " + errors, 2, errors.size()); @@ -379,15 +382,18 @@ public class RpcIntegrationSuite { Set containsAndClosed = Sets.newHashSet(expectedError); containsAndClosed.add("closed"); containsAndClosed.add("Connection reset"); +containsAndClosed.add("java.nio.channels.ClosedChannelException"); Pair, Set> r = checkErrorsContain(errors, containsAndClosed); -Set errorsNotFound = r.getRight(); -assertEquals(1, errorsNotFound.size()); -String err = errorsNotFound.iterator().next(); -assertTrue(err.equals("closed") || err.equals("Connection reset")); +assertTrue("Got a non-empty set " + r.getLeft(), r.getLeft().isEmpty()); -assertTrue(r.getLeft().isEmpty()); +Set errorsNotFound = r.getRight(); +assertEquals( +"The size of " + errorsNotFound.toString() + " was not 2", 2, errorsNotFound.size()); +for (String err: errorsNotFound) { + assertTrue("Found a wrong error " + err, containsAndClosed.contains(err)); +} } private Pair, Set> checkErrorsContain( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r30927 - in /dev/spark/2.4.1-SNAPSHOT-2018_11_16_08_06-be42bfe-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Nov 16 16:23:25 2018 New Revision: 30927 Log: Apache Spark 2.4.1-SNAPSHOT-2018_11_16_08_06-be42bfe docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r30926 - in /dev/spark/2.3.3-SNAPSHOT-2018_11_16_08_06-550408e-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Nov 16 16:21:37 2018 New Revision: 30926 Log: Apache Spark 2.3.3-SNAPSHOT-2018_11_16_08_06-550408e docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: [SPARK-26034][PYTHON][TESTS] Break large mllib/tests.py file into smaller files
[SPARK-26034][PYTHON][TESTS] Break large mllib/tests.py file into smaller files ## What changes were proposed in this pull request? This PR breaks down the large mllib/tests.py file that contains all Python MLlib unit tests into several smaller test files to be easier to read and maintain. The tests are broken down as follows: ``` pyspark âââ __init__.py ... âââ mllib â âââ __init__.py ... â âââ tests â â âââ __init__.py â â âââ test_algorithms.py â â âââ test_feature.py â â âââ test_linalg.py â â âââ test_stat.py â â âââ test_streaming_algorithms.py â â âââ test_util.py ... âââ testing ... â âââ mllibutils.py ... ``` ## How was this patch tested? Ran tests manually by module to ensure test count was the same, and ran `python/run-tests --modules=pyspark-mllib` to verify all passing with Python 2.7 and Python 3.6. Also installed scipy to include optional tests in test_linalg. Closes #23056 from BryanCutler/python-test-breakup-mllib-SPARK-26034. Authored-by: Bryan Cutler Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2fc48c2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2fc48c2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2fc48c2 Branch: refs/heads/master Commit: a2fc48c28c06192d1f650582d128d60c7188ec62 Parents: 696b75a Author: Bryan Cutler Authored: Sat Nov 17 00:12:17 2018 +0800 Committer: hyukjinkwon Committed: Sat Nov 17 00:12:17 2018 +0800 -- dev/sparktestsupport/modules.py |9 +- python/pyspark/mllib/tests.py | 1787 -- python/pyspark/mllib/tests/__init__.py | 16 + python/pyspark/mllib/tests/test_algorithms.py | 313 +++ python/pyspark/mllib/tests/test_feature.py | 201 ++ python/pyspark/mllib/tests/test_linalg.py | 642 +++ python/pyspark/mllib/tests/test_stat.py | 197 ++ .../mllib/tests/test_streaming_algorithms.py| 523 + python/pyspark/mllib/tests/test_util.py | 115 ++ python/pyspark/testing/mllibutils.py| 44 + 10 files changed, 2059 insertions(+), 1788 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a2fc48c2/dev/sparktestsupport/modules.py -- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 58b48f4..547635a 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -416,6 +416,7 @@ pyspark_mllib = Module( "python/pyspark/mllib" ], python_test_goals=[ +# doctests "pyspark.mllib.classification", "pyspark.mllib.clustering", "pyspark.mllib.evaluation", @@ -430,7 +431,13 @@ pyspark_mllib = Module( "pyspark.mllib.stat.KernelDensity", "pyspark.mllib.tree", "pyspark.mllib.util", -"pyspark.mllib.tests", +# unittests +"pyspark.mllib.tests.test_algorithms", +"pyspark.mllib.tests.test_feature", +"pyspark.mllib.tests.test_linalg", +"pyspark.mllib.tests.test_stat", +"pyspark.mllib.tests.test_streaming_algorithms", +"pyspark.mllib.tests.test_util", ], blacklisted_python_implementations=[ "PyPy" # Skip these tests under PyPy since they require numpy and it isn't available there http://git-wip-us.apache.org/repos/asf/spark/blob/a2fc48c2/python/pyspark/mllib/tests.py -- diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py deleted file mode 100644 index 4c2ce13..000 --- a/python/pyspark/mllib/tests.py +++ /dev/null @@ -1,1787 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Fuller unit tests for Python MLlib. -""" - -import os -import sys -import tempfile -import array as pyarray -from math import sqrt -from time
[1/2] spark git commit: [SPARK-26034][PYTHON][TESTS] Break large mllib/tests.py file into smaller files
Repository: spark Updated Branches: refs/heads/master 696b75a81 -> a2fc48c28 http://git-wip-us.apache.org/repos/asf/spark/blob/a2fc48c2/python/pyspark/mllib/tests/test_algorithms.py -- diff --git a/python/pyspark/mllib/tests/test_algorithms.py b/python/pyspark/mllib/tests/test_algorithms.py new file mode 100644 index 000..8a34541 --- /dev/null +++ b/python/pyspark/mllib/tests/test_algorithms.py @@ -0,0 +1,313 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import tempfile +from shutil import rmtree + +from numpy import array, array_equal + +from py4j.protocol import Py4JJavaError + +if sys.version_info[:2] <= (2, 6): +try: +import unittest2 as unittest +except ImportError: +sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier') +sys.exit(1) +else: +import unittest + +from pyspark.mllib.fpm import FPGrowth +from pyspark.mllib.recommendation import Rating +from pyspark.mllib.regression import LabeledPoint +from pyspark.sql.utils import IllegalArgumentException +from pyspark.testing.mllibutils import make_serializer, MLlibTestCase + + +ser = make_serializer() + + +class ListTests(MLlibTestCase): + +""" +Test MLlib algorithms on plain lists, to make sure they're passed through +as NumPy arrays. +""" + +def test_bisecting_kmeans(self): +from pyspark.mllib.clustering import BisectingKMeans +data = array([0.0, 0.0, 1.0, 1.0, 9.0, 8.0, 8.0, 9.0]).reshape(4, 2) +bskm = BisectingKMeans() +model = bskm.train(self.sc.parallelize(data, 2), k=4) +p = array([0.0, 0.0]) +rdd_p = self.sc.parallelize([p]) +self.assertEqual(model.predict(p), model.predict(rdd_p).first()) +self.assertEqual(model.computeCost(p), model.computeCost(rdd_p)) +self.assertEqual(model.k, len(model.clusterCenters)) + +def test_kmeans(self): +from pyspark.mllib.clustering import KMeans +data = [ +[0, 1.1], +[0, 1.2], +[1.1, 0], +[1.2, 0], +] +clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||", +initializationSteps=7, epsilon=1e-4) +self.assertEqual(clusters.predict(data[0]), clusters.predict(data[1])) +self.assertEqual(clusters.predict(data[2]), clusters.predict(data[3])) + +def test_kmeans_deterministic(self): +from pyspark.mllib.clustering import KMeans +X = range(0, 100, 10) +Y = range(0, 100, 10) +data = [[x, y] for x, y in zip(X, Y)] +clusters1 = KMeans.train(self.sc.parallelize(data), + 3, initializationMode="k-means||", + seed=42, initializationSteps=7, epsilon=1e-4) +clusters2 = KMeans.train(self.sc.parallelize(data), + 3, initializationMode="k-means||", + seed=42, initializationSteps=7, epsilon=1e-4) +centers1 = clusters1.centers +centers2 = clusters2.centers +for c1, c2 in zip(centers1, centers2): +# TODO: Allow small numeric difference. +self.assertTrue(array_equal(c1, c2)) + +def test_gmm(self): +from pyspark.mllib.clustering import GaussianMixture +data = self.sc.parallelize([ +[1, 2], +[8, 9], +[-4, -3], +[-6, -7], +]) +clusters = GaussianMixture.train(data, 2, convergenceTol=0.001, + maxIterations=10, seed=1) +labels = clusters.predict(data).collect() +self.assertEqual(labels[0], labels[1]) +self.assertEqual(labels[2], labels[3]) + +def test_gmm_deterministic(self): +from pyspark.mllib.clustering import GaussianMixture +x = range(0, 100, 10) +y = range(0, 100, 10) +data = self.sc.parallelize([[a, b] for a, b in zip(x, y)]) +clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001, + maxIterations=10, seed=63) +
spark git commit: [SPARK-25934][MESOS] Don't propagate SPARK_CONF_DIR from spark submit
Repository: spark Updated Branches: refs/heads/branch-2.3 7a596187e -> 550408e56 [SPARK-25934][MESOS] Don't propagate SPARK_CONF_DIR from spark submit ## What changes were proposed in this pull request? Don't propagate SPARK_CONF_DIR to the driver in mesos cluster mode. ## How was this patch tested? I built the 2.3.2 tag with this patch added and deployed a test job to a mesos cluster to confirm that the incorrect SPARK_CONF_DIR was no longer passed from the submit command. Closes #22937 from mpmolek/fix-conf-dir. Authored-by: Matt Molek Signed-off-by: Sean Owen (cherry picked from commit 696b75a81013ad61d25e0552df2b019c7531f983) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/550408e5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/550408e5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/550408e5 Branch: refs/heads/branch-2.3 Commit: 550408e56c079a3e25b16c7ccc495216c9c7b9cf Parents: 7a59618 Author: Matt Molek Authored: Fri Nov 16 10:00:21 2018 -0600 Committer: Sean Owen Committed: Fri Nov 16 10:01:00 2018 -0600 -- .../apache/spark/deploy/rest/RestSubmissionClient.scala | 8 +--- .../spark/deploy/rest/StandaloneRestSubmitSuite.scala | 12 2 files changed, 17 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/550408e5/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 742a958..c45a714 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -394,6 +394,10 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { } private[spark] object RestSubmissionClient { + + // SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong + // on the remote machine (SPARK-12345) (SPARK-25934) + private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR") private val REPORT_DRIVER_STATUS_INTERVAL = 1000 private val REPORT_DRIVER_STATUS_MAX_TRIES = 10 val PROTOCOL_VERSION = "v1" @@ -403,9 +407,7 @@ private[spark] object RestSubmissionClient { */ private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { env.filterKeys { k => - // SPARK_HOME is filtered out because it is usually wrong on the remote machine (SPARK-12345) - (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != "SPARK_HOME") || -k.startsWith("MESOS_") + (k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_") } } } http://git-wip-us.apache.org/repos/asf/spark/blob/550408e5/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index e505bc0..0f56c69 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -376,6 +376,18 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { assert(filteredVariables == Map("SPARK_VAR" -> "1")) } + test("client does not send 'SPARK_HOME' env var by default") { +val environmentVariables = Map("SPARK_VAR" -> "1", "SPARK_HOME" -> "1") +val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) +assert(filteredVariables == Map("SPARK_VAR" -> "1")) + } + + test("client does not send 'SPARK_CONF_DIR' env var by default") { +val environmentVariables = Map("SPARK_VAR" -> "1", "SPARK_CONF_DIR" -> "1") +val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) +assert(filteredVariables == Map("SPARK_VAR" -> "1")) + } + test("client includes mesos env vars") { val environmentVariables = Map("SPARK_VAR" -> "1", "MESOS_VAR" -> "1", "OTHER_VAR" -> "1") val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25934][MESOS] Don't propagate SPARK_CONF_DIR from spark submit
Repository: spark Updated Branches: refs/heads/branch-2.4 ed9fa790c -> be42bfe5c [SPARK-25934][MESOS] Don't propagate SPARK_CONF_DIR from spark submit ## What changes were proposed in this pull request? Don't propagate SPARK_CONF_DIR to the driver in mesos cluster mode. ## How was this patch tested? I built the 2.3.2 tag with this patch added and deployed a test job to a mesos cluster to confirm that the incorrect SPARK_CONF_DIR was no longer passed from the submit command. Closes #22937 from mpmolek/fix-conf-dir. Authored-by: Matt Molek Signed-off-by: Sean Owen (cherry picked from commit 696b75a81013ad61d25e0552df2b019c7531f983) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be42bfe5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be42bfe5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be42bfe5 Branch: refs/heads/branch-2.4 Commit: be42bfe5cb4d03f855f322613d0de247ef4474fe Parents: ed9fa79 Author: Matt Molek Authored: Fri Nov 16 10:00:21 2018 -0600 Committer: Sean Owen Committed: Fri Nov 16 10:00:36 2018 -0600 -- .../apache/spark/deploy/rest/RestSubmissionClient.scala | 8 +--- .../spark/deploy/rest/StandaloneRestSubmitSuite.scala | 12 2 files changed, 17 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/be42bfe5/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 31a8e3e..afa413f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -408,6 +408,10 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { } private[spark] object RestSubmissionClient { + + // SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong + // on the remote machine (SPARK-12345) (SPARK-25934) + private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR") private val REPORT_DRIVER_STATUS_INTERVAL = 1000 private val REPORT_DRIVER_STATUS_MAX_TRIES = 10 val PROTOCOL_VERSION = "v1" @@ -417,9 +421,7 @@ private[spark] object RestSubmissionClient { */ private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { env.filterKeys { k => - // SPARK_HOME is filtered out because it is usually wrong on the remote machine (SPARK-12345) - (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != "SPARK_HOME") || -k.startsWith("MESOS_") + (k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_") } } } http://git-wip-us.apache.org/repos/asf/spark/blob/be42bfe5/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 54c168a..75fb716 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -376,6 +376,18 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { assert(filteredVariables == Map("SPARK_VAR" -> "1")) } + test("client does not send 'SPARK_HOME' env var by default") { +val environmentVariables = Map("SPARK_VAR" -> "1", "SPARK_HOME" -> "1") +val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) +assert(filteredVariables == Map("SPARK_VAR" -> "1")) + } + + test("client does not send 'SPARK_CONF_DIR' env var by default") { +val environmentVariables = Map("SPARK_VAR" -> "1", "SPARK_CONF_DIR" -> "1") +val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) +assert(filteredVariables == Map("SPARK_VAR" -> "1")) + } + test("client includes mesos env vars") { val environmentVariables = Map("SPARK_VAR" -> "1", "MESOS_VAR" -> "1", "OTHER_VAR" -> "1") val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25934][MESOS] Don't propagate SPARK_CONF_DIR from spark submit
Repository: spark Updated Branches: refs/heads/master 2aef79a65 -> 696b75a81 [SPARK-25934][MESOS] Don't propagate SPARK_CONF_DIR from spark submit ## What changes were proposed in this pull request? Don't propagate SPARK_CONF_DIR to the driver in mesos cluster mode. ## How was this patch tested? I built the 2.3.2 tag with this patch added and deployed a test job to a mesos cluster to confirm that the incorrect SPARK_CONF_DIR was no longer passed from the submit command. Closes #22937 from mpmolek/fix-conf-dir. Authored-by: Matt Molek Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/696b75a8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/696b75a8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/696b75a8 Branch: refs/heads/master Commit: 696b75a81013ad61d25e0552df2b019c7531f983 Parents: 2aef79a Author: Matt Molek Authored: Fri Nov 16 10:00:21 2018 -0600 Committer: Sean Owen Committed: Fri Nov 16 10:00:21 2018 -0600 -- .../apache/spark/deploy/rest/RestSubmissionClient.scala | 8 +--- .../spark/deploy/rest/StandaloneRestSubmitSuite.scala | 12 2 files changed, 17 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/696b75a8/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 31a8e3e..afa413f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -408,6 +408,10 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { } private[spark] object RestSubmissionClient { + + // SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong + // on the remote machine (SPARK-12345) (SPARK-25934) + private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR") private val REPORT_DRIVER_STATUS_INTERVAL = 1000 private val REPORT_DRIVER_STATUS_MAX_TRIES = 10 val PROTOCOL_VERSION = "v1" @@ -417,9 +421,7 @@ private[spark] object RestSubmissionClient { */ private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { env.filterKeys { k => - // SPARK_HOME is filtered out because it is usually wrong on the remote machine (SPARK-12345) - (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != "SPARK_HOME") || -k.startsWith("MESOS_") + (k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_") } } } http://git-wip-us.apache.org/repos/asf/spark/blob/696b75a8/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 4839c84..89b8bb4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -396,6 +396,18 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { assert(filteredVariables == Map("SPARK_VAR" -> "1")) } + test("client does not send 'SPARK_HOME' env var by default") { +val environmentVariables = Map("SPARK_VAR" -> "1", "SPARK_HOME" -> "1") +val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) +assert(filteredVariables == Map("SPARK_VAR" -> "1")) + } + + test("client does not send 'SPARK_CONF_DIR' env var by default") { +val environmentVariables = Map("SPARK_VAR" -> "1", "SPARK_CONF_DIR" -> "1") +val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) +assert(filteredVariables == Map("SPARK_VAR" -> "1")) + } + test("client includes mesos env vars") { val environmentVariables = Map("SPARK_VAR" -> "1", "MESOS_VAR" -> "1", "OTHER_VAR" -> "1") val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25023] More detailed security guidance for K8S
Repository: spark Updated Branches: refs/heads/branch-2.4 2d67be932 -> ed9fa790c [SPARK-25023] More detailed security guidance for K8S ## What changes were proposed in this pull request? Highlights specific security issues to be aware of with Spark on K8S and recommends K8S mechanisms that should be used to secure clusters. ## How was this patch tested? N/A - Documentation only CC felixcheung tgravescs skonto Closes #23013 from rvesse/SPARK-25023. Authored-by: Rob Vesse Signed-off-by: Sean Owen (cherry picked from commit 2aef79a65a145b76a88f1d4d9367091fd238b949) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed9fa790 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed9fa790 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed9fa790 Branch: refs/heads/branch-2.4 Commit: ed9fa790c1b69448ecc75bf7f75900996e319f03 Parents: 2d67be9 Author: Rob Vesse Authored: Fri Nov 16 08:53:29 2018 -0600 Committer: Sean Owen Committed: Fri Nov 16 08:53:51 2018 -0600 -- docs/running-on-kubernetes.md | 16 +++- 1 file changed, 15 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ed9fa790/docs/running-on-kubernetes.md -- diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 754b1ff..41d2122 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -15,7 +15,19 @@ container images and entrypoints.** # Security Security in Spark is OFF by default. This could mean you are vulnerable to attack by default. -Please see [Spark Security](security.html) and the specific security sections in this doc before running Spark. +Please see [Spark Security](security.html) and the specific advice below before running Spark. + +## User Identity + +Images built from the project provided Dockerfiles do not contain any [`USER`](https://docs.docker.com/engine/reference/builder/#user) directives. This means that the resulting images will be running the Spark processes as `root` inside the container. On unsecured clusters this may provide an attack vector for privilege escalation and container breakout. Therefore security conscious deployments should consider providing custom images with `USER` directives specifying an unprivileged UID and GID. + +Alternatively the [Pod Template](#pod-template) feature can be used to add a [Security Context](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#volumes-and-file-systems) with a `runAsUser` to the pods that Spark submits. Please bear in mind that this requires cooperation from your users and as such may not be a suitable solution for shared environments. Cluster administrators should use [Pod Security Policies](https://kubernetes.io/docs/concepts/policy/pod-security-policy/#users-and-groups) if they wish to limit the users that pods may run as. + +## Volume Mounts + +As described later in this document under [Using Kubernetes Volumes](#using-kubernetes-volumes) Spark on K8S provides configuration options that allow for mounting certain volume types into the driver and executor pods. In particular it allows for [`hostPath`](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) volumes which as described in the Kubernetes documentation have known security vulnerabilities. + +Cluster administrators should use [Pod Security Policies](https://kubernetes.io/docs/concepts/policy/pod-security-policy/) to limit the ability to mount `hostPath` volumes appropriately for their environments. # Prerequisites @@ -197,6 +209,8 @@ Starting with Spark 2.4.0, users can mount the following types of Kubernetes [vo * [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir): an initially empty volume created when a pod is assigned to a node. * [persistentVolumeClaim](https://kubernetes.io/docs/concepts/storage/volumes/#persistentvolumeclaim): used to mount a `PersistentVolume` into a pod. +**NB:** Please see the [Security](#security) section of this document for security issues related to volume mounts. + To mount a volume of any of the types above into the driver pod, use the following configuration property: ``` - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25023] More detailed security guidance for K8S
Repository: spark Updated Branches: refs/heads/master 4ac8f9bec -> 2aef79a65 [SPARK-25023] More detailed security guidance for K8S ## What changes were proposed in this pull request? Highlights specific security issues to be aware of with Spark on K8S and recommends K8S mechanisms that should be used to secure clusters. ## How was this patch tested? N/A - Documentation only CC felixcheung tgravescs skonto Closes #23013 from rvesse/SPARK-25023. Authored-by: Rob Vesse Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aef79a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aef79a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aef79a6 Branch: refs/heads/master Commit: 2aef79a65a145b76a88f1d4d9367091fd238b949 Parents: 4ac8f9b Author: Rob Vesse Authored: Fri Nov 16 08:53:29 2018 -0600 Committer: Sean Owen Committed: Fri Nov 16 08:53:29 2018 -0600 -- docs/running-on-kubernetes.md | 16 +++- 1 file changed, 15 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2aef79a6/docs/running-on-kubernetes.md -- diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 9052268..a7b6fd1 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -15,7 +15,19 @@ container images and entrypoints.** # Security Security in Spark is OFF by default. This could mean you are vulnerable to attack by default. -Please see [Spark Security](security.html) and the specific security sections in this doc before running Spark. +Please see [Spark Security](security.html) and the specific advice below before running Spark. + +## User Identity + +Images built from the project provided Dockerfiles do not contain any [`USER`](https://docs.docker.com/engine/reference/builder/#user) directives. This means that the resulting images will be running the Spark processes as `root` inside the container. On unsecured clusters this may provide an attack vector for privilege escalation and container breakout. Therefore security conscious deployments should consider providing custom images with `USER` directives specifying an unprivileged UID and GID. + +Alternatively the [Pod Template](#pod-template) feature can be used to add a [Security Context](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#volumes-and-file-systems) with a `runAsUser` to the pods that Spark submits. Please bear in mind that this requires cooperation from your users and as such may not be a suitable solution for shared environments. Cluster administrators should use [Pod Security Policies](https://kubernetes.io/docs/concepts/policy/pod-security-policy/#users-and-groups) if they wish to limit the users that pods may run as. + +## Volume Mounts + +As described later in this document under [Using Kubernetes Volumes](#using-kubernetes-volumes) Spark on K8S provides configuration options that allow for mounting certain volume types into the driver and executor pods. In particular it allows for [`hostPath`](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) volumes which as described in the Kubernetes documentation have known security vulnerabilities. + +Cluster administrators should use [Pod Security Policies](https://kubernetes.io/docs/concepts/policy/pod-security-policy/) to limit the ability to mount `hostPath` volumes appropriately for their environments. # Prerequisites @@ -214,6 +226,8 @@ Starting with Spark 2.4.0, users can mount the following types of Kubernetes [vo * [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir): an initially empty volume created when a pod is assigned to a node. * [persistentVolumeClaim](https://kubernetes.io/docs/concepts/storage/volumes/#persistentvolumeclaim): used to mount a `PersistentVolume` into a pod. +**NB:** Please see the [Security](#security) section of this document for security issues related to volume mounts. + To mount a volume of any of the types above into the driver pod, use the following configuration property: ``` - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org