[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17346


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114468906
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -36,20 +37,27 @@ import org.apache.spark.util.SerializableConfiguration
  * A [[FileIndex]] that generates the list of files to process by 
recursively listing all the
  * files present in `paths`.
  *
- * @param rootPaths the list of root table paths to scan
+ * @param rootPathsSpecified the list of root table paths to scan (some of 
which might be
+ *   filtered out later)
  * @param parameters as set of options to control discovery
  * @param partitionSchema an optional partition schema that will be use to 
provide types for the
  *discovered partitions
  */
 class InMemoryFileIndex(
 sparkSession: SparkSession,
-override val rootPaths: Seq[Path],
+rootPathsSpecified: Seq[Path],
 parameters: Map[String, String],
 partitionSchema: Option[StructType],
 fileStatusCache: FileStatusCache = NoopCache)
   extends PartitioningAwareFileIndex(
 sparkSession, parameters, partitionSchema, fileStatusCache) {
 
+  // Filter out streaming metadata dirs or files such as 
"/.../_spark_metadata" (the metadata dir)
+  // or "/.../_spark_metadata/0" (a file in the metadata dir). 
`rootPathsSpecified` might contain
+  // such streaming metadata dir or files, e.g. when after globbing 
"basePath/*" where "basePath"
+  // is the output of a streaming query.
+  override val rootPaths = 
rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory)
--- End diff --

Yea your are quite correct! They will be filted by 
`InMemoryFileIndex.shouldFilterOut`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114468833
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -145,6 +147,41 @@ class FileStreamSinkSuite extends StreamTest {
 }
   }
 
+  test("partitioned writing and batch reading with 'basePath'") {
+val inputData = MemoryStream[Int]
+val ds = inputData.toDS()
+
+val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114468801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 ---
@@ -53,6 +53,29 @@ object FileStreamSink extends Logging {
   case _ => false
 }
   }
+
+  /**
+   * Returns true if the path is the metadata dir or its ancestor is the 
metadata dir.
+   * E.g.:
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata) => true
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true
+   *  - ancestorIsMetadataDirectory(/a/b/c) => false
+   */
+  def ancestorIsMetadataDirectory(path: Path): Boolean = {
+require(path.isAbsolute, s"$path is required to be absolute")
--- End diff --

switched to `makeQualified`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114468821
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 ---
@@ -53,6 +53,29 @@ object FileStreamSink extends Logging {
   case _ => false
 }
   }
+
+  /**
+   * Returns true if the path is the metadata dir or its ancestor is the 
metadata dir.
+   * E.g.:
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata) => true
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true
+   *  - ancestorIsMetadataDirectory(/a/b/c) => false
+   */
+  def ancestorIsMetadataDirectory(path: Path): Boolean = {
+require(path.isAbsolute, s"$path is required to be absolute")
+var currentPath = path
+var finished = false
+while (!finished) {
--- End diff --

fixed. good point!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114395863
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 ---
@@ -53,6 +53,29 @@ object FileStreamSink extends Logging {
   case _ => false
 }
   }
+
+  /**
+   * Returns true if the path is the metadata dir or its ancestor is the 
metadata dir.
+   * E.g.:
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata) => true
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true
+   *  - ancestorIsMetadataDirectory(/a/b/c) => false
+   */
+  def ancestorIsMetadataDirectory(path: Path): Boolean = {
+require(path.isAbsolute, s"$path is required to be absolute")
+var currentPath = path
+var finished = false
+while (!finished) {
--- End diff --

How about changing it to `currentPath  != null`? Then you don't need 
`finished`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114396634
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -145,6 +147,41 @@ class FileStreamSinkSuite extends StreamTest {
 }
   }
 
+  test("partitioned writing and batch reading with 'basePath'") {
+val inputData = MemoryStream[Int]
+val ds = inputData.toDS()
+
+val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
+val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
--- End diff --

nit: same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114396542
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -145,6 +147,41 @@ class FileStreamSinkSuite extends StreamTest {
 }
   }
 
+  test("partitioned writing and batch reading with 'basePath'") {
+val inputData = MemoryStream[Int]
+val ds = inputData.toDS()
+
+val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
--- End diff --

nit: use `withTempDir` to create temp dir instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114397372
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -36,20 +37,27 @@ import org.apache.spark.util.SerializableConfiguration
  * A [[FileIndex]] that generates the list of files to process by 
recursively listing all the
  * files present in `paths`.
  *
- * @param rootPaths the list of root table paths to scan
+ * @param rootPathsSpecified the list of root table paths to scan (some of 
which might be
+ *   filtered out later)
  * @param parameters as set of options to control discovery
  * @param partitionSchema an optional partition schema that will be use to 
provide types for the
  *discovered partitions
  */
 class InMemoryFileIndex(
 sparkSession: SparkSession,
-override val rootPaths: Seq[Path],
+rootPathsSpecified: Seq[Path],
 parameters: Map[String, String],
 partitionSchema: Option[StructType],
 fileStatusCache: FileStatusCache = NoopCache)
   extends PartitioningAwareFileIndex(
 sparkSession, parameters, partitionSchema, fileStatusCache) {
 
+  // Filter out streaming metadata dirs or files such as 
"/.../_spark_metadata" (the metadata dir)
+  // or "/.../_spark_metadata/0" (a file in the metadata dir). 
`rootPathsSpecified` might contain
+  // such streaming metadata dir or files, e.g. when after globbing 
"basePath/*" where "basePath"
+  // is the output of a streaming query.
+  override val rootPaths = 
rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory)
--- End diff --

Just to confirm one thing: for files in `rootPaths` or their sub dirs, they 
will be dropped by `InMemoryFileIndex.shouldFilterOut`. Right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114395114
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 ---
@@ -53,6 +53,29 @@ object FileStreamSink extends Logging {
   case _ => false
 }
   }
+
+  /**
+   * Returns true if the path is the metadata dir or its ancestor is the 
metadata dir.
+   * E.g.:
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata) => true
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true
+   *  - ancestorIsMetadataDirectory(/a/b/c) => false
+   */
+  def ancestorIsMetadataDirectory(path: Path): Boolean = {
+require(path.isAbsolute, s"$path is required to be absolute")
--- End diff --

I'm wondering if we can call `makeQualified` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org