This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 0a4b356 Revert "[SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()" 0a4b356 is described below commit 0a4b35642ffa3020ec0fcae2cca59376e2095636 Author: Xiao Li <gatorsm...@gmail.com> AuthorDate: Fri Sep 6 23:37:36 2019 -0700 Revert "[SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()" This reverts commit 2654c33fd6a7a09e2b2fa9fc1c2ea6224ab292e6. --- .../scala/org/apache/spark/streaming/Checkpoint.scala | 4 ++-- .../org/apache/spark/streaming/CheckpointSuite.scala | 17 ----------------- 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index b081287..a882558 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -128,8 +128,8 @@ object Checkpoint extends Logging { try { val statuses = fs.listStatus(path) if (statuses != null) { - val paths = statuses.filterNot(_.isDirectory).map(_.getPath) - val filtered = paths.filter(p => REGEX.findFirstIn(p.getName).nonEmpty) + val paths = statuses.map(_.getPath) + val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty) filtered.sortWith(sortFunc) } else { logWarning(s"Listing $path returned null") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 43e3cdd..19b621f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -846,23 +846,6 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester checkpointWriter.stop() } - test("SPARK-28912: Fix MatchError in getCheckpointFiles") { - withTempDir { tempDir => - val fs = FileSystem.get(tempDir.toURI, new Configuration) - val checkpointDir = tempDir.getAbsolutePath + "/checkpoint-01" - - assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0) - - // Ignore files whose parent path match. - fs.create(new Path(checkpointDir, "this-is-matched-before-due-to-parent-path")).close() - assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0) - - // Ignore directories whose names match. - fs.mkdirs(new Path(checkpointDir, "checkpoint-1000000000")) - assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0) - } - } - test("SPARK-6847: stack overflow when updateStateByKey is followed by a checkpointed dstream") { // In this test, there are two updateStateByKey operators. The RDD DAG is as follows: // --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org