Repository: spark
Updated Branches:
  refs/heads/branch-2.2 f0e80aa2d -> 36d807906


[SPARK-19965][SS] DataFrame batch reader may fail to infer partitions when 
reading FileStreamSink's output

## The Problem

Right now DataFrame batch reader may fail to infer partitions when reading 
FileStreamSink's output:

```
[info] - partitioned writing and batch reading with 'basePath' *** FAILED *** 
(3 seconds, 928 milliseconds)
[info]   java.lang.AssertionError: assertion failed: Conflicting directory 
structures detected. Suspicious paths:
[info]  ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
[info]  ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
[info]
[info] If provided paths are partition directories, please set "basePath" in 
the options of the data source to specify the root directory of the table. If 
there are multiple root directories, please load them separately and then union 
them.
[info]   at scala.Predef$.assert(Predef.scala:170)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
[info]   at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
[info]   at 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
[info]   at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
[info]   at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
[info]   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
[info]   at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
[info]   at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
[info]   at 
org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
[info]   at 
org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
[info]   at 
org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
```

## What changes were proposed in this pull request?

This patch alters `InMemoryFileIndex` to filter out these `basePath`s whose 
ancestor is the streaming metadata dir (`_spark_metadata`). E.g., the following 
and other similar dir or files will be filtered out:
- (introduced by globbing `basePath/*`)
   - `basePath/_spark_metadata`
- (introduced by globbing `basePath/*/*`)
   - `basePath/_spark_metadata/0`
   - `basePath/_spark_metadata/1`
   - ...

## How was this patch tested?

Added unit tests

Author: Liwei Lin <lwl...@gmail.com>

Closes #17346 from lw-lin/filter-metadata.

(cherry picked from commit 6b9e49d12fc4c9b29d497122daa4cc9bf4540b16)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36d80790
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36d80790
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36d80790

Branch: refs/heads/branch-2.2
Commit: 36d80790699c529b15e9c1a2cf2f9f636b1f24e6
Parents: f0e80aa
Author: Liwei Lin <lwl...@gmail.com>
Authored: Wed May 3 11:10:24 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed May 3 11:10:31 2017 -0700

----------------------------------------------------------------------
 .../datasources/InMemoryFileIndex.scala         | 13 ++++-
 .../execution/streaming/FileStreamSink.scala    | 20 +++++++
 .../datasources/FileSourceStrategySuite.scala   |  2 +-
 .../sql/streaming/FileStreamSinkSuite.scala     | 59 +++++++++++++++++++-
 4 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/36d80790/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 9897ab7..91e3165 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.execution.streaming.FileStreamSink
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
@@ -36,20 +37,28 @@ import org.apache.spark.util.SerializableConfiguration
  * A [[FileIndex]] that generates the list of files to process by recursively 
listing all the
  * files present in `paths`.
  *
- * @param rootPaths the list of root table paths to scan
+ * @param rootPathsSpecified the list of root table paths to scan (some of 
which might be
+ *                           filtered out later)
  * @param parameters as set of options to control discovery
  * @param partitionSchema an optional partition schema that will be use to 
provide types for the
  *                        discovered partitions
  */
 class InMemoryFileIndex(
     sparkSession: SparkSession,
-    override val rootPaths: Seq[Path],
+    rootPathsSpecified: Seq[Path],
     parameters: Map[String, String],
     partitionSchema: Option[StructType],
     fileStatusCache: FileStatusCache = NoopCache)
   extends PartitioningAwareFileIndex(
     sparkSession, parameters, partitionSchema, fileStatusCache) {
 
+  // Filter out streaming metadata dirs or files such as 
"/.../_spark_metadata" (the metadata dir)
+  // or "/.../_spark_metadata/0" (a file in the metadata dir). 
`rootPathsSpecified` might contain
+  // such streaming metadata dir or files, e.g. when after globbing 
"basePath/*" where "basePath"
+  // is the output of a streaming query.
+  override val rootPaths =
+    rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, 
hadoopConf))
+
   @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, 
FileStatus] = _
   @volatile private var cachedLeafDirToChildrenFiles: Map[Path, 
Array[FileStatus]] = _
   @volatile private var cachedPartitionSpec: PartitionSpec = _

http://git-wip-us.apache.org/repos/asf/spark/blob/36d80790/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 07ec4e9..6885d0b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -53,6 +53,26 @@ object FileStreamSink extends Logging {
       case _ => false
     }
   }
+
+  /**
+   * Returns true if the path is the metadata dir or its ancestor is the 
metadata dir.
+   * E.g.:
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata) => true
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true
+   *  - ancestorIsMetadataDirectory(/a/b/c) => false
+   */
+  def ancestorIsMetadataDirectory(path: Path, hadoopConf: Configuration): 
Boolean = {
+    val fs = path.getFileSystem(hadoopConf)
+    var currentPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+    while (currentPath != null) {
+      if (currentPath.getName == FileStreamSink.metadataDir) {
+        return true
+      } else {
+        currentPath = currentPath.getParent
+      }
+    }
+    return false
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/36d80790/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 8703fe9..fa3c696 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -395,7 +395,7 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
 
         val fileCatalog = new InMemoryFileIndex(
           sparkSession = spark,
-          rootPaths = Seq(new Path(tempDir)),
+          rootPathsSpecified = Seq(new Path(tempDir)),
           parameters = Map.empty[String, String],
           partitionSchema = None)
         // This should not fail.

http://git-wip-us.apache.org/repos/asf/spark/blob/36d80790/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 1211242..1a2d3a1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.streaming
 
 import java.util.Locale
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.sql.{AnalysisException, DataFrame}
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.streaming.{MemoryStream, 
MetadataLogFileIndex}
+import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
@@ -145,6 +147,43 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 
+  test("partitioned writing and batch reading with 'basePath'") {
+    withTempDir { outputDir =>
+      withTempDir { checkpointDir =>
+        val outputPath = outputDir.getAbsolutePath
+        val inputData = MemoryStream[Int]
+        val ds = inputData.toDS()
+
+        var query: StreamingQuery = null
+
+        try {
+          query =
+            ds.map(i => (i, -i, i * 1000))
+              .toDF("id1", "id2", "value")
+              .writeStream
+              .partitionBy("id1", "id2")
+              .option("checkpointLocation", checkpointDir.getAbsolutePath)
+              .format("parquet")
+              .start(outputPath)
+
+          inputData.addData(1, 2, 3)
+          failAfter(streamingTimeout) {
+            query.processAllAvailable()
+          }
+
+          val readIn = spark.read.option("basePath", 
outputPath).parquet(s"$outputDir/*/*")
+          checkDatasetUnorderly(
+            readIn.as[(Int, Int, Int)],
+            (1000, 1, -1), (2000, 2, -2), (3000, 3, -3))
+        } finally {
+          if (query != null) {
+            query.stop()
+          }
+        }
+      }
+    }
+  }
+
   // This tests whether FileStreamSink works with aggregations. Specifically, 
it tests
   // whether the correct streaming QueryExecution (i.e. IncrementalExecution) 
is used to
   // to execute the trigger for writing data to file sink. See SPARK-18440 for 
more details.
@@ -266,4 +305,22 @@ class FileStreamSinkSuite extends StreamTest {
       }
     }
   }
+
+  test("FileStreamSink.ancestorIsMetadataDirectory()") {
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    def assertAncestorIsMetadataDirectory(path: String): Unit =
+      assert(FileStreamSink.ancestorIsMetadataDirectory(new Path(path), 
hadoopConf))
+    def assertAncestorIsNotMetadataDirectory(path: String): Unit =
+      assert(!FileStreamSink.ancestorIsMetadataDirectory(new Path(path), 
hadoopConf))
+
+    assertAncestorIsMetadataDirectory(s"/${FileStreamSink.metadataDir}")
+    assertAncestorIsMetadataDirectory(s"/${FileStreamSink.metadataDir}/")
+    assertAncestorIsMetadataDirectory(s"/a/${FileStreamSink.metadataDir}")
+    assertAncestorIsMetadataDirectory(s"/a/${FileStreamSink.metadataDir}/")
+    assertAncestorIsMetadataDirectory(s"/a/b/${FileStreamSink.metadataDir}/c")
+    assertAncestorIsMetadataDirectory(s"/a/b/${FileStreamSink.metadataDir}/c/")
+
+    assertAncestorIsNotMetadataDirectory(s"/a/b/c")
+    
assertAncestorIsNotMetadataDirectory(s"/a/b/c/${FileStreamSink.metadataDir}extra")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to