[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16987


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103604050
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -190,32 +190,31 @@ class FileStreamSource(
 val startTime = System.nanoTime
 
 var allFiles: Seq[FileStatus] = null
-if (sourceHasMetadata.isEmpty) {
-  if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
-sourceHasMetadata = Some(true)
-allFiles = allFilesUsingMetadataLogFileIndex()
-  } else {
-allFiles = allFilesUsingInMemoryFileIndex()
-if (allFiles.isEmpty) {
-  // we still cannot decide
+sourceHasMetadata match {
--- End diff --

simply switched to `sourceHasMetadata match { case... case ... case ...}`
actual diff is quite small


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603945
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partition=foo, should read value and 
partition
+q1AddData(("foo", "keep3")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+// batch 2: append to a different partition=bar, should read value 
and partition
+q1AddData(("bar", "keep4")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+// stop q1 

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603935
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partition=foo, should read value and 
partition
+q1AddData(("foo", "keep3")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+// batch 2: append to a different partition=bar, should read value 
and partition
+q1AddData(("bar", "keep4")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+// stop q1 

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603942
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partition=foo, should read value and 
partition
+q1AddData(("foo", "keep3")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+// batch 2: append to a different partition=bar, should read value 
and partition
+q1AddData(("bar", "keep4")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+// stop q1 

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603922
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partition=foo, should read value and 
partition
+q1AddData(("foo", "keep3")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+// batch 2: append to a different partition=bar, should read value 
and partition
+q1AddData(("bar", "keep4")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+// stop q1 

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603898
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
--- End diff --

test removed -- Let me think about this write partition infommation thing :)
thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603705
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603671
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
@@ -208,6 +208,11 @@ trait StreamTest extends QueryTest with 
SharedSQLContext with Timeouts {
 }
   }
 
+  /** Execute arbitrary code */
+  case class Execute(val func: StreamExecution => Any) extends 
StreamAction {
--- End diff --

fixed, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603693
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603687
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103565167
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partition=foo, should read value and 
partition
+q1AddData(("foo", "keep3")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+// batch 2: append to a different partition=bar, should read value 
and partition
+q1AddData(("bar", "keep4")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+// stop q1 

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103567273
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
--- End diff --

nit: `// stop q1 manually` is obvious. Don't add such comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103569221
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
--- End diff --

In the long term, we should write the partition information to the file 
sink log, then we can read it in the file source. However, it's out of scope. 
If you have time, you can think about it and submit a new PR after this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103565647
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partition=foo, should read value and 
partition
+q1AddData(("foo", "keep3")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+// batch 2: append to a different partition=bar, should read value 
and partition
+q1AddData(("bar", "keep4")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+// stop q1 

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103538967
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
--- End diff --

nit: please don't use `_` in a variable name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103565523
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partition=foo, should read value and 
partition
+q1AddData(("foo", "keep3")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+// batch 2: append to a different partition=bar, should read value 
and partition
+q1AddData(("bar", "keep4")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+// stop q1 

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103538636
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
@@ -208,6 +208,11 @@ trait StreamTest extends QueryTest with 
SharedSQLContext with Timeouts {
 }
   }
 
+  /** Execute arbitrary code */
+  case class Execute(val func: StreamExecution => Any) extends 
StreamAction {
--- End diff --

How about just make this extend `AssertOnQuery` to avoid adding new case 
clause to `testStream` which is already pretty long?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103562238
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
--- End diff --

`tmp` is not used. Why not just name them as `(outputDir, checkpointDir)`? 
Same for other tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103567860
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
--- End diff --

This test seems not necessary. It will pass even if the source doesn't use 
the partition information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103565336
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partition=foo, should read value and 
partition
+q1AddData(("foo", "keep3")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+// batch 2: append to a different partition=bar, should read value 
and partition
+q1AddData(("bar", "keep4")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+// stop q1 

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103405331
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -52,10 +52,7 @@ abstract class FileStreamSourceTest
 query.nonEmpty,
 "Cannot add data when there is no query for finding the active 
file stream source")
 
-  val sources = query.get.logicalPlan.collect {
-case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[FileStreamSource] =>
-  source.asInstanceOf[FileStreamSource]
-  }
--- End diff --

this common logic is extracted out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103404962
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -159,28 +161,64 @@ class FileStreamSource(
 
   /**
* If the source has a metadata log indicating which files should be 
read, then we should use it.
-   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   * Only when user gives a non-glob path that will we figure out whether 
the source has some
+   * metadata log
+   *
+   * Nonemeans we don't know at the moment
+   * Some(true)  means we know for sure the source DOES have metadata
+   * Some(false) means we know for sure the source DOSE NOT have metadata
*/
-  private val sourceHasMetadata: Boolean =
-!SparkHadoopUtil.get.isGlobPath(new Path(path)) &&
-  FileStreamSink.hasMetadata(Seq(path), 
sparkSession.sessionState.newHadoopConf())
+  @volatile private[sql] var sourceHasMetadata: Option[Boolean] =
+if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else 
None
+
+  private def allFilesUsingInMemoryFileIndex() = {
+val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
+val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, 
options, Some(new StructType))
+fileIndex.allFiles()
+  }
+
+  private def allFilesUsingMetadataLogFileIndex() = {
+// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is 
guaranteed to be a
+// non-glob path
+new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
+  }
 
   /**
* Returns a list of files found, sorted by their timestamp.
*/
   private def fetchAllFiles(): Seq[(String, Long)] = {
 val startTime = System.nanoTime
-val catalog =
-  if (sourceHasMetadata) {
-// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is 
guaranteed to be a
-// non-glob path
-new MetadataLogFileIndex(sparkSession, qualifiedBasePath)
+
--- End diff --

seems like `sourceHasMetadata match { case ... }` is more appropriate here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103404486
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -159,28 +161,64 @@ class FileStreamSource(
 
   /**
* If the source has a metadata log indicating which files should be 
read, then we should use it.
-   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   * Only when user gives a non-glob path that will we figure out whether 
the source has some
+   * metadata log
+   *
+   * Nonemeans we don't know at the moment
+   * Some(true)  means we know for sure the source DOES have metadata
+   * Some(false) means we know for sure the source DOSE NOT have metadata
*/
-  private val sourceHasMetadata: Boolean =
-!SparkHadoopUtil.get.isGlobPath(new Path(path)) &&
-  FileStreamSink.hasMetadata(Seq(path), 
sparkSession.sessionState.newHadoopConf())
+  @volatile private[sql] var sourceHasMetadata: Option[Boolean] =
+if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else 
None
+
+  private def allFilesUsingInMemoryFileIndex() = {
+val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
+val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, 
options, Some(new StructType))
+fileIndex.allFiles()
+  }
+
+  private def allFilesUsingMetadataLogFileIndex() = {
+// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is 
guaranteed to be a
+// non-glob path
+new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
+  }
 
   /**
* Returns a list of files found, sorted by their timestamp.
*/
   private def fetchAllFiles(): Seq[(String, Long)] = {
 val startTime = System.nanoTime
-val catalog =
-  if (sourceHasMetadata) {
-// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is 
guaranteed to be a
-// non-glob path
-new MetadataLogFileIndex(sparkSession, qualifiedBasePath)
+
--- End diff --

then based on `sourceHasMetadata`'s value, we can choose which `FileIndex` 
should be used. As showed below, `case None` should be dealt with most care.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103403986
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -159,28 +161,64 @@ class FileStreamSource(
 
   /**
* If the source has a metadata log indicating which files should be 
read, then we should use it.
-   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   * Only when user gives a non-glob path that will we figure out whether 
the source has some
+   * metadata log
+   *
+   * Nonemeans we don't know at the moment
+   * Some(true)  means we know for sure the source DOES have metadata
+   * Some(false) means we know for sure the source DOSE NOT have metadata
--- End diff --

( some notes here since the changes are not trival )

here we're using this `sourceHasMetadata` to indicate whether we know for 
sure the source has metadata, as stated in the source file comments:
- `None`means we don't know at the moment
- `Some(true)`  means we know for sure the source DOES have metadata
- `Some(false)` means we know for sure the source DOSE NOT have metadata



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-27 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103366158
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -158,12 +158,28 @@ class FileStreamSource(
   }
 
   /**
+   * If the source has a metadata log indicating which files should be 
read, then we should use it.
+   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   */
+  private val sourceHasMetadata: Boolean =
--- End diff --

and add a dedicated test case of course


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-27 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103366082
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -158,12 +158,28 @@ class FileStreamSource(
   }
 
   /**
+   * If the source has a metadata log indicating which files should be 
read, then we should use it.
+   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   */
+  private val sourceHasMetadata: Boolean =
--- End diff --

ah thanks! I was about to change it to a method which would stop detecting 
once we know for sure to use a metadatafileindex or a inmemoryfileindex and 
save up this information. will udpate with this code soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-27 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103363591
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -158,12 +158,28 @@ class FileStreamSource(
   }
 
   /**
+   * If the source has a metadata log indicating which files should be 
read, then we should use it.
+   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   */
+  private val sourceHasMetadata: Boolean =
--- End diff --

Actually, why not just change `sourceHasMetadata` to a method? 
`sparkSession.sessionState.newHadoopConf()` seems expensive but we can save it 
into a field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-27 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103151253
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -158,12 +158,28 @@ class FileStreamSource(
   }
 
   /**
+   * If the source has a metadata log indicating which files should be 
read, then we should use it.
+   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   */
+  private val sourceHasMetadata: Boolean =
--- End diff --

Just found one corner case: if the query to write files has not yet 
started, the current folder will contain no files even it's an output folder of 
the file sink. I think we should always call `sourceHasMetadata` until the 
folder is not empty. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103104036
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +663,101 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  testWithUninterruptibleThread("read data from outputs of another 
streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+val sinkLogDir = new File(dir, 
FileStreamSink.metadataDir).getCanonicalPath
+val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, 
spark, sinkLogDir)
+
+val fileStream = createFileStream("text", dir.getCanonicalPath)
+val filtered = fileStream.filter($"value" contains "keep")
+
+def addIntoSinkLog(batch: Int, fileName: String): Boolean = {
+  val unqualifiedDirPath = new Path(new File(dir, 
fileName).getCanonicalPath)
+  val fs = 
unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
+  val sinkFileStatus = 
SinkFileStatus(fs.getFileStatus(unqualifiedDirPath))
+  sinkLog.add(batch, Array(sinkFileStatus))
+}
+
+testStream(filtered)(
+  // Create new dir and write to it, should read
+  AddTextFileData("drop1\nkeep2", dir, tmp, Some("file_1")),
+  Assert { addIntoSinkLog(0, "file_1") },
+  CheckAnswer("keep2"),
+
+  // Create "file_2" but DO NOT add it to the log intentionally
+  AddTextFileData("should_not_keep!!!", dir, tmp, Some("file_2")),
+  Assert { new File(dir, "file_2").exists() },
+  AddTextFileData("keep3", dir, tmp, Some("file_3")),
+  Assert { addIntoSinkLog(1, "file_3") },
+  // Should NOT read "file_2"; should read "file_3"
+  CheckAnswer("keep2", "keep3"),
+
+  // Check that things work well when the sink log gets compacted
+  AddTextFileData("keep4", dir, tmp, Some("file_4")),
+  Assert { addIntoSinkLog(2, "file_4") },
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(sinkLogDir, "2" + 
CompactibleFileStreamLog.COMPACT_FILE_SUFFIX).exists()
+  },
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  AddTextFileData("keep5", dir, tmp, Some("file_5")),
+  Assert { addIntoSinkLog(3, "file_5") },
+  CheckAnswer("keep2", "keep3", "keep4", "keep5")
+)
+  }
+}
+  }
+
+  testWithUninterruptibleThread("read partitioned data from outputs of 
another streaming query") {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103104024
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +663,101 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  testWithUninterruptibleThread("read data from outputs of another 
streaming query") {
--- End diff --

done; thanks! and good job for https://github.com/apache/spark/pull/16947!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103104003
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -243,13 +243,20 @@ case class DataSource(
 val path = caseInsensitiveOptions.getOrElse("path", {
   throw new IllegalArgumentException("'path' is not specified")
 })
+// If we're reading files from outputs of another streaming query, 
then it does not make
+// sense to glob files since we would get files from the metadata 
log.
+// Thus we would figure out whether there exists some metadata log 
only when user gives a
+// non-glob path.
+val sourceHasMetadata =
+  !SparkHadoopUtil.get.isGlobPath(new Path(path)) && 
hasMetadata(Seq(path))
--- End diff --

Yea `hasMetadata` was the reason! Now it lives in `object FileStreamSink` 
:-D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103014135
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +663,101 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  testWithUninterruptibleThread("read data from outputs of another 
streaming query") {
--- End diff --

nit: you can merge the latest master and use `test` directly. Not need to 
use `testWithUninterruptibleThread` after #16947


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103013496
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -243,13 +243,20 @@ case class DataSource(
 val path = caseInsensitiveOptions.getOrElse("path", {
   throw new IllegalArgumentException("'path' is not specified")
 })
+// If we're reading files from outputs of another streaming query, 
then it does not make
+// sense to glob files since we would get files from the metadata 
log.
+// Thus we would figure out whether there exists some metadata log 
only when user gives a
+// non-glob path.
+val sourceHasMetadata =
+  !SparkHadoopUtil.get.isGlobPath(new Path(path)) && 
hasMetadata(Seq(path))
--- End diff --

I guess `sourceHasMetadata` is generated here is because of `hasMetadata`. 
Could you move `hasMetadata` to `object FileStreamSink`? Then you can do it 
inside `FileStreamSource`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103014293
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +663,101 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  testWithUninterruptibleThread("read data from outputs of another 
streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+val sinkLogDir = new File(dir, 
FileStreamSink.metadataDir).getCanonicalPath
+val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, 
spark, sinkLogDir)
+
+val fileStream = createFileStream("text", dir.getCanonicalPath)
+val filtered = fileStream.filter($"value" contains "keep")
+
+def addIntoSinkLog(batch: Int, fileName: String): Boolean = {
+  val unqualifiedDirPath = new Path(new File(dir, 
fileName).getCanonicalPath)
+  val fs = 
unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
+  val sinkFileStatus = 
SinkFileStatus(fs.getFileStatus(unqualifiedDirPath))
+  sinkLog.add(batch, Array(sinkFileStatus))
+}
+
+testStream(filtered)(
+  // Create new dir and write to it, should read
+  AddTextFileData("drop1\nkeep2", dir, tmp, Some("file_1")),
+  Assert { addIntoSinkLog(0, "file_1") },
+  CheckAnswer("keep2"),
+
+  // Create "file_2" but DO NOT add it to the log intentionally
+  AddTextFileData("should_not_keep!!!", dir, tmp, Some("file_2")),
+  Assert { new File(dir, "file_2").exists() },
+  AddTextFileData("keep3", dir, tmp, Some("file_3")),
+  Assert { addIntoSinkLog(1, "file_3") },
+  // Should NOT read "file_2"; should read "file_3"
+  CheckAnswer("keep2", "keep3"),
+
+  // Check that things work well when the sink log gets compacted
+  AddTextFileData("keep4", dir, tmp, Some("file_4")),
+  Assert { addIntoSinkLog(2, "file_4") },
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(sinkLogDir, "2" + 
CompactibleFileStreamLog.COMPACT_FILE_SUFFIX).exists()
+  },
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  AddTextFileData("keep5", dir, tmp, Some("file_5")),
+  Assert { addIntoSinkLog(3, "file_5") },
+  CheckAnswer("keep2", "keep3", "keep4", "keep5")
+)
+  }
+}
+  }
+
+  testWithUninterruptibleThread("read partitioned data from outputs of 
another streaming query") {
--- End diff --

nit: same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-23 Thread lw-lin
GitHub user lw-lin reopened a pull request:

https://github.com/apache/spark/pull/16987

[SPARK-19633][SS] FileSource read from FileSink

## What changes were proposed in this pull request?

Right now file source always uses `InMemoryFileIndex` to scan files from a 
given path.

But when reading the outputs from another streaming query, the file source 
should use `MetadataFileIndex` to list files from the sink log. This patch adds 
this support.

## `MetadataFileIndex` or `InMemoryFileIndex`
```scala
spark
  .readStream
  .format(...)
  .load("/some/path") // for a non-glob path:
  //   - use `MetadataFileIndex` when 
`/some/path/_spark_meta` exists
  //   - fall back to `InMemoryFileIndex` otherwise
```
```scala
spark
  .readStream
  .format(...)
  .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex`
```

## How was this patch tested?

two newly added tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark source-read-from-sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16987.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16987


commit b66d2ccabcae41973bd8af4ed406567dc071ff67
Author: Liwei Lin 
Date:   2017-02-18T01:20:18Z

File Source reads from File Sink




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-22 Thread lw-lin
Github user lw-lin closed the pull request at:

https://github.com/apache/spark/pull/16987


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-19 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r101917807
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -76,12 +76,13 @@ abstract class FileStreamSourceTest
 protected def addData(source: FileStreamSource): Unit
   }
 
-  case class AddTextFileData(content: String, src: File, tmp: File)
-extends AddFileData {
+  case class AddTextFileData (
+  content: String, src: File, tmp: File, finalFileName: Option[String] 
= None
+) extends AddFileData {
 
 override def addData(source: FileStreamSource): Unit = {
   val tempFile = Utils.tempFileWith(new File(tmp, "text"))
-  val finalFile = new File(src, tempFile.getName)
+  val finalFile = new File(src, 
finalFileName.getOrElse(tempFile.getName))
--- End diff --

this is to keep track of the file name for later checking


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org