[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17346 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114468906 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -36,20 +37,27 @@ import org.apache.spark.util.SerializableConfiguration * A [[FileIndex]] that generates the list of files to process by recursively listing all the * files present in `paths`. * - * @param rootPaths the list of root table paths to scan + * @param rootPathsSpecified the list of root table paths to scan (some of which might be + * filtered out later) * @param parameters as set of options to control discovery * @param partitionSchema an optional partition schema that will be use to provide types for the *discovered partitions */ class InMemoryFileIndex( sparkSession: SparkSession, -override val rootPaths: Seq[Path], +rootPathsSpecified: Seq[Path], parameters: Map[String, String], partitionSchema: Option[StructType], fileStatusCache: FileStatusCache = NoopCache) extends PartitioningAwareFileIndex( sparkSession, parameters, partitionSchema, fileStatusCache) { + // Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir) + // or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain + // such streaming metadata dir or files, e.g. when after globbing "basePath/*" where "basePath" + // is the output of a streaming query. + override val rootPaths = rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory) --- End diff -- Yea your are quite correct! They will be filted by `InMemoryFileIndex.shouldFilterOut`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114468833 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -145,6 +147,41 @@ class FileStreamSinkSuite extends StreamTest { } } + test("partitioned writing and batch reading with 'basePath'") { +val inputData = MemoryStream[Int] +val ds = inputData.toDS() + +val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114468801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala --- @@ -53,6 +53,29 @@ object FileStreamSink extends Logging { case _ => false } } + + /** + * Returns true if the path is the metadata dir or its ancestor is the metadata dir. + * E.g.: + * - ancestorIsMetadataDirectory(/.../_spark_metadata) => true + * - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true + * - ancestorIsMetadataDirectory(/a/b/c) => false + */ + def ancestorIsMetadataDirectory(path: Path): Boolean = { +require(path.isAbsolute, s"$path is required to be absolute") --- End diff -- switched to `makeQualified` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114468821 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala --- @@ -53,6 +53,29 @@ object FileStreamSink extends Logging { case _ => false } } + + /** + * Returns true if the path is the metadata dir or its ancestor is the metadata dir. + * E.g.: + * - ancestorIsMetadataDirectory(/.../_spark_metadata) => true + * - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true + * - ancestorIsMetadataDirectory(/a/b/c) => false + */ + def ancestorIsMetadataDirectory(path: Path): Boolean = { +require(path.isAbsolute, s"$path is required to be absolute") +var currentPath = path +var finished = false +while (!finished) { --- End diff -- fixed. good point! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114395863 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala --- @@ -53,6 +53,29 @@ object FileStreamSink extends Logging { case _ => false } } + + /** + * Returns true if the path is the metadata dir or its ancestor is the metadata dir. + * E.g.: + * - ancestorIsMetadataDirectory(/.../_spark_metadata) => true + * - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true + * - ancestorIsMetadataDirectory(/a/b/c) => false + */ + def ancestorIsMetadataDirectory(path: Path): Boolean = { +require(path.isAbsolute, s"$path is required to be absolute") +var currentPath = path +var finished = false +while (!finished) { --- End diff -- How about changing it to `currentPath != null`? Then you don't need `finished` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114396634 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -145,6 +147,41 @@ class FileStreamSinkSuite extends StreamTest { } } + test("partitioned writing and batch reading with 'basePath'") { +val inputData = MemoryStream[Int] +val ds = inputData.toDS() + +val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath +val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath --- End diff -- nit: same as above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114396542 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -145,6 +147,41 @@ class FileStreamSinkSuite extends StreamTest { } } + test("partitioned writing and batch reading with 'basePath'") { +val inputData = MemoryStream[Int] +val ds = inputData.toDS() + +val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath --- End diff -- nit: use `withTempDir` to create temp dir instead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114397372 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -36,20 +37,27 @@ import org.apache.spark.util.SerializableConfiguration * A [[FileIndex]] that generates the list of files to process by recursively listing all the * files present in `paths`. * - * @param rootPaths the list of root table paths to scan + * @param rootPathsSpecified the list of root table paths to scan (some of which might be + * filtered out later) * @param parameters as set of options to control discovery * @param partitionSchema an optional partition schema that will be use to provide types for the *discovered partitions */ class InMemoryFileIndex( sparkSession: SparkSession, -override val rootPaths: Seq[Path], +rootPathsSpecified: Seq[Path], parameters: Map[String, String], partitionSchema: Option[StructType], fileStatusCache: FileStatusCache = NoopCache) extends PartitioningAwareFileIndex( sparkSession, parameters, partitionSchema, fileStatusCache) { + // Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir) + // or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain + // such streaming metadata dir or files, e.g. when after globbing "basePath/*" where "basePath" + // is the output of a streaming query. + override val rootPaths = rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory) --- End diff -- Just to confirm one thing: for files in `rootPaths` or their sub dirs, they will be dropped by `InMemoryFileIndex.shouldFilterOut`. Right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114395114 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala --- @@ -53,6 +53,29 @@ object FileStreamSink extends Logging { case _ => false } } + + /** + * Returns true if the path is the metadata dir or its ancestor is the metadata dir. + * E.g.: + * - ancestorIsMetadataDirectory(/.../_spark_metadata) => true + * - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true + * - ancestorIsMetadataDirectory(/a/b/c) => false + */ + def ancestorIsMetadataDirectory(path: Path): Boolean = { +require(path.isAbsolute, s"$path is required to be absolute") --- End diff -- I'm wondering if we can call `makeQualified` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org