Repository: spark
Updated Branches:
  refs/heads/master ba5e0b87a -> 285385965


[SPARK-14833][SQL][STREAMING][TEST] Refactor StreamTests to test for source 
fault-tolerance correctly.

## What changes were proposed in this pull request?

Current StreamTest allows testing of a streaming Dataset generated explicitly 
wraps a source. This is different from the actual production code path where 
the source object is dynamically created through a DataSource object every time 
a query is started. So all the fault-tolerance testing in FileSourceSuite and 
FileSourceStressSuite is not really testing the actual code path as they are 
just reusing the FileStreamSource object.

This PR fixes StreamTest and the FileSource***Suite to test this correctly. 
Instead of maintaining a mapping of source --> expected offset in StreamTest 
(which requires reuse of source object), it now maintains a mapping of source 
index --> offset, so that it is independent of the source object.

Summary of changes
- StreamTest refactored to keep track of offset by source index instead of 
source
- AddData, AddTextData and AddParquetData updated to find the FileStreamSource 
object from an active query, so that it can work with sources generated when 
query is started.
- Refactored unit tests in FileSource***Suite to test using DataFrame/Dataset 
generated with public, rather than reusing the same FileStreamSource. This 
correctly tests fault tolerance.

The refactoring changed a lot of indents in FileSourceSuite, so its recommended 
to hide whitespace changes with this - 
https://github.com/apache/spark/pull/12592/files?w=1

## How was this patch tested?

Refactored unit tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #12592 from tdas/SPARK-14833.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28538596
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28538596
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28538596

Branch: refs/heads/master
Commit: 28538596558b7f69f9d22eb0902d0e609d98be88
Parents: ba5e0b8
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Sat Apr 23 21:53:05 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Sat Apr 23 21:53:05 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/StreamTest.scala |  71 ++--
 .../sql/streaming/FileStreamSourceSuite.scala   | 359 ++++++++++---------
 2 files changed, 233 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28538596/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index 242ea9c..c014f61 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -67,12 +67,6 @@ import org.apache.spark.util.Utils
  */
 trait StreamTest extends QueryTest with Timeouts {
 
-  implicit class RichSource(s: Source) {
-    def toDF(): DataFrame = Dataset.ofRows(sqlContext, 
StreamingExecutionRelation(s))
-
-    def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, 
StreamingExecutionRelation(s))
-  }
-
   /** How long to wait for an active stream to catch up when checking a 
result. */
   val streamingTimeout = 10.seconds
 
@@ -93,22 +87,21 @@ trait StreamTest extends QueryTest with Timeouts {
       AddDataMemory(source, data)
   }
 
-  /** A trait that can be extended when testing other sources. */
+  /** A trait that can be extended when testing a source. */
   trait AddData extends StreamAction {
-    def source: Source
-
     /**
-     * Called to trigger adding the data.  Should return the offset that will 
denote when this
-     * new data has been processed.
+     * Called to adding the data to a source. It should find the source to add 
data to from
+     * the active query, and then return the source object the data was added, 
as well as the
+     * offset of added data.
      */
-    def addData(): Offset
+    def addData(query: Option[StreamExecution]): (Source, Offset)
   }
 
   case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends 
AddData {
     override def toString: String = s"AddData to $source: 
${data.mkString(",")}"
 
-    override def addData(): Offset = {
-      source.addData(data)
+    override def addData(query: Option[StreamExecution]): (Source, Offset) = {
+      (source, source.addData(data))
     }
   }
 
@@ -199,7 +192,7 @@ trait StreamTest extends QueryTest with Timeouts {
     var currentPlan: LogicalPlan = stream.logicalPlan
     var currentStream: StreamExecution = null
     var lastStream: StreamExecution = null
-    val awaiting = new mutable.HashMap[Source, Offset]()
+    val awaiting = new mutable.HashMap[Int, Offset]() // source index -> 
offset to wait for
     val sink = new MemorySink(stream.schema)
 
     @volatile
@@ -372,15 +365,53 @@ trait StreamTest extends QueryTest with Timeouts {
             verify({ a.run(); true }, s"Assert failed: ${a.message}")
 
           case a: AddData =>
-            awaiting.put(a.source, a.addData())
+            try {
+              // Add data and get the source where it was added, and the 
expected offset of the
+              // added data.
+              val queryToUse = Option(currentStream).orElse(Option(lastStream))
+              val (source, offset) = a.addData(queryToUse)
+
+              def findSourceIndex(plan: LogicalPlan): Option[Int] = {
+                plan
+                  .collect { case StreamingExecutionRelation(s, _) => s }
+                  .zipWithIndex
+                  .find(_._1 == source)
+                  .map(_._2)
+              }
+
+              // Try to find the index of the source to which data was added. 
Either get the index
+              // from the current active query or the original input logical 
plan.
+              val sourceIndex =
+                queryToUse.flatMap { query =>
+                  findSourceIndex(query.logicalPlan)
+                }.orElse {
+                  findSourceIndex(stream.logicalPlan)
+                }.getOrElse {
+                  throw new IllegalArgumentException(
+                    "Could find index of the source to which data was added")
+                }
+
+              // Store the expected offset of added data to wait for it later
+              awaiting.put(sourceIndex, offset)
+            } catch {
+              case NonFatal(e) =>
+                failTest("Error adding data", e)
+            }
 
           case CheckAnswerRows(expectedAnswer, lastOnly) =>
             verify(currentStream != null, "stream not running")
-
-            // Block until all data added has been processed
-            awaiting.foreach { case (source, offset) =>
+            // Get the map of source index to the current source objects
+            val indexToSource = currentStream
+              .logicalPlan
+              .collect { case StreamingExecutionRelation(s, _) => s }
+              .zipWithIndex
+              .map(_.swap)
+              .toMap
+
+            // Block until all data added has been processed for all the source
+            awaiting.foreach { case (sourceIndex, offset) =>
               failAfter(streamingTimeout) {
-                currentStream.awaitOffset(source, offset)
+                currentStream.awaitOffset(indexToSource(sourceIndex), offset)
               }
             }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/28538596/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 45dca2f..8ff82a3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -30,38 +30,59 @@ class FileStreamSourceTest extends StreamTest with 
SharedSQLContext {
 
   import testImplicits._
 
-  case class AddTextFileData(source: FileStreamSource, content: String, src: 
File, tmp: File)
-    extends AddData {
-
-    override def addData(): Offset = {
-      source.withBatchingLocked {
-        val file = Utils.tempFileWith(new File(tmp, "text"))
-        stringToFile(file, content).renameTo(new File(src, file.getName))
-        source.currentOffset
-      } + 1
+  /**
+   * A subclass [[AddData]] for adding data to files. This is meant to use the
+   * [[FileStreamSource]] actually being used in the execution.
+   */
+  abstract class AddFileData extends AddData {
+    override def addData(query: Option[StreamExecution]): (Source, Offset) = {
+      require(
+        query.nonEmpty,
+        "Cannot add data when there is no query for finding the active file 
stream source")
+
+      val sources = query.get.logicalPlan.collect {
+        case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[FileStreamSource] =>
+          source.asInstanceOf[FileStreamSource]
+      }
+      if (sources.isEmpty) {
+        throw new Exception(
+          "Could not find file source in the StreamExecution logical plan to 
add data to")
+      } else if (sources.size > 1) {
+        throw new Exception(
+          "Could not select the file source in the StreamExecution logical 
plan as there" +
+            "are multiple file sources:\n\t" + sources.mkString("\n\t"))
+      }
+      val source = sources.head
+      val newOffset = source.withBatchingLocked {
+        addData(source)
+        source.currentOffset + 1
+      }
+      logInfo(s"Added data to $source at offset $newOffset")
+      (source, newOffset)
     }
+
+    protected def addData(source: FileStreamSource): Unit
   }
 
-  case class AddParquetFileData(
-      source: FileStreamSource,
-      df: DataFrame,
-      src: File,
-      tmp: File) extends AddData {
-
-    override def addData(): Offset = {
-      source.withBatchingLocked {
-        AddParquetFileData.writeToFile(df, src, tmp)
-        source.currentOffset
-      } + 1
+  case class AddTextFileData(content: String, src: File, tmp: File)
+    extends AddFileData {
+
+    override def addData(source: FileStreamSource): Unit = {
+      val file = Utils.tempFileWith(new File(tmp, "text"))
+      stringToFile(file, content).renameTo(new File(src, file.getName))
+    }
+  }
+
+  case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends 
AddFileData {
+    override def addData(source: FileStreamSource): Unit = {
+      AddParquetFileData.writeToFile(data, src, tmp)
     }
   }
 
   object AddParquetFileData {
-    def apply(
-      source: FileStreamSource,
-      seq: Seq[String],
-      src: File,
-      tmp: File): AddParquetFileData = new AddParquetFileData(source, 
seq.toDS().toDF(), src, tmp)
+    def apply(seq: Seq[String], src: File, tmp: File): AddParquetFileData = {
+      AddParquetFileData(seq.toDS().toDF(), src, tmp)
+    }
 
     def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
       val file = Utils.tempFileWith(new File(tmp, "parquet"))
@@ -71,11 +92,11 @@ class FileStreamSourceTest extends StreamTest with 
SharedSQLContext {
   }
 
   /** Use `format` and `path` to create FileStreamSource via DataFrameReader */
-  def createFileStreamSource(
+  def createFileStream(
       format: String,
       path: String,
-      schema: Option[StructType] = None): FileStreamSource = {
-    val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+      schema: Option[StructType] = None): DataFrame = {
+
     val reader =
       if (schema.isDefined) {
         sqlContext.read.format(format).schema(schema.get)
@@ -83,14 +104,18 @@ class FileStreamSourceTest extends StreamTest with 
SharedSQLContext {
         sqlContext.read.format(format)
       }
     reader.stream(path)
-      .queryExecution.analyzed
+  }
+
+  protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = {
+    val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+    df.queryExecution.analyzed
       .collect { case StreamingRelation(dataSource, _, _) =>
         // There is only one source in our tests so just set sourceId to 0
         
dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource]
       }.head
   }
 
-  def withTempDirs(body: (File, File) => Unit) {
+  protected def withTempDirs(body: (File, File) => Unit) {
     val src = Utils.createTempDir(namePrefix = "streaming.src")
     val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
     try {
@@ -108,6 +133,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
with SharedSQLContext {
 
   import testImplicits._
 
+  /** Use `format` and `path` to create FileStreamSource via DataFrameReader */
+  private def createFileStreamSource(
+      format: String,
+      path: String,
+      schema: Option[StructType] = None): FileStreamSource = {
+    getSourceFromFileStream(createFileStream(format, path, schema))
+  }
+
   private def createFileStreamSourceAndGetSchema(
       format: Option[String],
       path: Option[String],
@@ -226,104 +259,88 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
with SharedSQLContext {
   }
 
   test("read from text files") {
-    val src = Utils.createTempDir(namePrefix = "streaming.src")
-    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-
-    val textSource = createFileStreamSource("text", src.getCanonicalPath)
-    val filtered = textSource.toDF().filter($"value" contains "keep")
-
-    testStream(filtered)(
-      AddTextFileData(textSource, "drop1\nkeep2\nkeep3", src, tmp),
-      CheckAnswer("keep2", "keep3"),
-      StopStream,
-      AddTextFileData(textSource, "drop4\nkeep5\nkeep6", src, tmp),
-      StartStream,
-      CheckAnswer("keep2", "keep3", "keep5", "keep6"),
-      AddTextFileData(textSource, "drop7\nkeep8\nkeep9", src, tmp),
-      CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
-    )
-
-    Utils.deleteRecursively(src)
-    Utils.deleteRecursively(tmp)
+    withTempDirs { case (src, tmp) =>
+      val textStream = createFileStream("text", src.getCanonicalPath)
+      val filtered = textStream.filter($"value" contains "keep")
+
+      testStream(filtered)(
+        AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+        CheckAnswer("keep2", "keep3"),
+        StopStream,
+        AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+        StartStream,
+        CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+        AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+        CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+      )
+    }
   }
 
   test("read from json files") {
-    val src = Utils.createTempDir(namePrefix = "streaming.src")
-    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-
-    val textSource = createFileStreamSource("json", src.getCanonicalPath, 
Some(valueSchema))
-    val filtered = textSource.toDF().filter($"value" contains "keep")
-
-    testStream(filtered)(
-      AddTextFileData(
-        textSource,
-        "{'value': 'drop1'}\n{'value': 'keep2'}\n{'value': 'keep3'}",
-        src,
-        tmp),
-      CheckAnswer("keep2", "keep3"),
-      StopStream,
-      AddTextFileData(
-        textSource,
-        "{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
-        src,
-        tmp),
-      StartStream,
-      CheckAnswer("keep2", "keep3", "keep5", "keep6"),
-      AddTextFileData(
-        textSource,
-        "{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
-        src,
-        tmp),
-      CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
-    )
-
-    Utils.deleteRecursively(src)
-    Utils.deleteRecursively(tmp)
+    withTempDirs { case (src, tmp) =>
+      val fileStream = createFileStream("json", src.getCanonicalPath, 
Some(valueSchema))
+      val filtered = fileStream.filter($"value" contains "keep")
+
+      testStream(filtered)(
+        AddTextFileData(
+          "{'value': 'drop1'}\n{'value': 'keep2'}\n{'value': 'keep3'}",
+          src,
+          tmp),
+        CheckAnswer("keep2", "keep3"),
+        StopStream,
+        AddTextFileData(
+          "{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
+          src,
+          tmp),
+        StartStream,
+        CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+        AddTextFileData(
+          "{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
+          src,
+          tmp),
+        CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+      )
+    }
   }
 
   test("read from json files with inferring schema") {
-    val src = Utils.createTempDir(namePrefix = "streaming.src")
-    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-
-    // Add a file so that we can infer its schema
-    stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 
'keep2'}\n{'c': 'keep3'}")
+    withTempDirs { case (src, tmp) =>
 
-    val textSource = createFileStreamSource("json", src.getCanonicalPath)
+      // Add a file so that we can infer its schema
+      stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 
'keep2'}\n{'c': 'keep3'}")
 
-    // FileStreamSource should infer the column "c"
-    val filtered = textSource.toDF().filter($"c" contains "keep")
+      val fileStream = createFileStream("json", src.getCanonicalPath)
+      assert(fileStream.schema === StructType(Seq(StructField("c", 
StringType))))
 
-    testStream(filtered)(
-      AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 
'keep6'}", src, tmp),
-      CheckAnswer("keep2", "keep3", "keep5", "keep6")
-    )
+      // FileStreamSource should infer the column "c"
+      val filtered = fileStream.filter($"c" contains "keep")
 
-    Utils.deleteRecursively(src)
-    Utils.deleteRecursively(tmp)
+      testStream(filtered)(
+        AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, 
tmp),
+        CheckAnswer("keep2", "keep3", "keep5", "keep6")
+      )
+    }
   }
 
 
   test("reading from json files inside partitioned directory") {
-    val src = {
-      val base = Utils.createTempDir(namePrefix = "streaming.src")
-      new File(base, "type=X")
-    }
-    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-    src.mkdirs()
+    withTempDirs { case (baseSrc, tmp) =>
+      val src = new File(baseSrc, "type=X")
+      src.mkdirs()
 
+      // Add a file so that we can infer its schema
+      stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 
'keep2'}\n{'c': 'keep3'}")
 
-    // Add a file so that we can infer its schema
-    stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 
'keep2'}\n{'c': 'keep3'}")
-
-    val textSource = createFileStreamSource("json", src.getCanonicalPath)
+      val fileStream = createFileStream("json", src.getCanonicalPath)
 
-    // FileStreamSource should infer the column "c"
-    val filtered = textSource.toDF().filter($"c" contains "keep")
+      // FileStreamSource should infer the column "c"
+      val filtered = fileStream.filter($"c" contains "keep")
 
-    testStream(filtered)(
-      AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 
'keep6'}", src, tmp),
-      CheckAnswer("keep2", "keep3", "keep5", "keep6")
-    )
+      testStream(filtered)(
+        AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, 
tmp),
+        CheckAnswer("keep2", "keep3", "keep5", "keep6")
+      )
+    }
   }
 
 
@@ -333,52 +350,47 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
with SharedSQLContext {
       // Add a file so that we can infer its schema
       stringToFile(new File(src, "existing"), "{'k': 'value0'}")
 
-      val textSource = createFileStreamSource("json", src.getCanonicalPath)
+      val fileStream = createFileStream("json", src.getCanonicalPath)
 
       // FileStreamSource should infer the column "k"
-      val text = textSource.toDF()
-      assert(text.schema === StructType(Seq(StructField("k", StringType))))
+      assert(fileStream.schema === StructType(Seq(StructField("k", 
StringType))))
 
       // After creating DF and before starting stream, add data with different 
schema
       // Should not affect the inferred schema any more
       stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}")
 
-      testStream(text)(
+      testStream(fileStream)(
 
         // Should not pick up column v in the file added before start
-        AddTextFileData(textSource, "{'k': 'value2'}", src, tmp),
+        AddTextFileData("{'k': 'value2'}", src, tmp),
         CheckAnswer("value0", "value1", "value2"),
 
         // Should read data in column k, and ignore v
-        AddTextFileData(textSource, "{'k': 'value3', 'v': 'new'}", src, tmp),
+        AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp),
         CheckAnswer("value0", "value1", "value2", "value3"),
 
         // Should ignore rows that do not have the necessary k column
-        AddTextFileData(textSource, "{'v': 'value4'}", src, tmp),
+        AddTextFileData("{'v': 'value4'}", src, tmp),
         CheckAnswer("value0", "value1", "value2", "value3", null))
     }
   }
 
   test("read from parquet files") {
-    val src = Utils.createTempDir(namePrefix = "streaming.src")
-    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-
-    val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, 
Some(valueSchema))
-    val filtered = fileSource.toDF().filter($"value" contains "keep")
-
-    testStream(filtered)(
-      AddParquetFileData(fileSource, Seq("drop1", "keep2", "keep3"), src, tmp),
-      CheckAnswer("keep2", "keep3"),
-      StopStream,
-      AddParquetFileData(fileSource, Seq("drop4", "keep5", "keep6"), src, tmp),
-      StartStream,
-      CheckAnswer("keep2", "keep3", "keep5", "keep6"),
-      AddParquetFileData(fileSource, Seq("drop7", "keep8", "keep9"), src, tmp),
-      CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
-    )
-
-    Utils.deleteRecursively(src)
-    Utils.deleteRecursively(tmp)
+    withTempDirs { case (src, tmp) =>
+      val fileStream = createFileStream("parquet", src.getCanonicalPath, 
Some(valueSchema))
+      val filtered = fileStream.filter($"value" contains "keep")
+
+      testStream(filtered)(
+        AddParquetFileData(Seq("drop1", "keep2", "keep3"), src, tmp),
+        CheckAnswer("keep2", "keep3"),
+        StopStream,
+        AddParquetFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
+        StartStream,
+        CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+        AddParquetFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
+        CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+      )
+    }
   }
 
   test("read from parquet files with changing schema") {
@@ -387,69 +399,62 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
with SharedSQLContext {
       // Add a file so that we can infer its schema
       AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
 
-      val fileSource = createFileStreamSource("parquet", src.getCanonicalPath)
-      val parquetData = fileSource.toDF()
+      val fileStream = createFileStream("parquet", src.getCanonicalPath)
 
       // FileStreamSource should infer the column "k"
-      assert(parquetData.schema === StructType(Seq(StructField("k", 
StringType))))
+      assert(fileStream.schema === StructType(Seq(StructField("k", 
StringType))))
 
       // After creating DF and before starting stream, add data with different 
schema
       // Should not affect the inferred schema any more
       AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, 
tmp)
 
-      testStream(parquetData)(
+      testStream(fileStream)(
         // Should not pick up column v in the file added before start
-        AddParquetFileData(fileSource, Seq("value2").toDF("k"), src, tmp),
+        AddParquetFileData(Seq("value2").toDF("k"), src, tmp),
         CheckAnswer("value0", "value1", "value2"),
 
         // Should read data in column k, and ignore v
-        AddParquetFileData(fileSource, Seq(("value3", 1)).toDF("k", "v"), src, 
tmp),
+        AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
         CheckAnswer("value0", "value1", "value2", "value3"),
 
         // Should ignore rows that do not have the necessary k column
-        AddParquetFileData(fileSource, Seq("value5").toDF("v"), src, tmp),
+        AddParquetFileData(Seq("value5").toDF("v"), src, tmp),
         CheckAnswer("value0", "value1", "value2", "value3", null)
       )
     }
   }
 
   test("file stream source without schema") {
-    val src = Utils.createTempDir(namePrefix = "streaming.src")
-
-    // Only "text" doesn't need a schema
-    createFileStreamSource("text", src.getCanonicalPath)
+    withTempDir { src =>
+      // Only "text" doesn't need a schema
+      createFileStream("text", src.getCanonicalPath)
 
-    // Both "json" and "parquet" require a schema if no existing file to infer
-    intercept[AnalysisException] {
-      createFileStreamSource("json", src.getCanonicalPath)
-    }
-    intercept[AnalysisException] {
-      createFileStreamSource("parquet", src.getCanonicalPath)
+      // Both "json" and "parquet" require a schema if no existing file to 
infer
+      intercept[AnalysisException] {
+        createFileStream("json", src.getCanonicalPath)
+      }
+      intercept[AnalysisException] {
+        createFileStream("parquet", src.getCanonicalPath)
+      }
     }
-
-    Utils.deleteRecursively(src)
   }
 
   test("fault tolerance") {
-    val src = Utils.createTempDir(namePrefix = "streaming.src")
-    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-
-    val textSource = createFileStreamSource("text", src.getCanonicalPath)
-    val filtered = textSource.toDF().filter($"value" contains "keep")
-
-    testStream(filtered)(
-      AddTextFileData(textSource, "drop1\nkeep2\nkeep3", src, tmp),
-      CheckAnswer("keep2", "keep3"),
-      StopStream,
-      AddTextFileData(textSource, "drop4\nkeep5\nkeep6", src, tmp),
-      StartStream,
-      CheckAnswer("keep2", "keep3", "keep5", "keep6"),
-      AddTextFileData(textSource, "drop7\nkeep8\nkeep9", src, tmp),
-      CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
-    )
-
-    Utils.deleteRecursively(src)
-    Utils.deleteRecursively(tmp)
+    withTempDirs { case (src, tmp) =>
+      val fileStream = createFileStream("text", src.getCanonicalPath)
+      val filtered = fileStream.filter($"value" contains "keep")
+
+      testStream(filtered)(
+        AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+        CheckAnswer("keep2", "keep3"),
+        StopStream,
+        AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+        StartStream,
+        CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+        AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+        CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+      )
+    }
   }
 }
 
@@ -461,10 +466,10 @@ class FileStreamSourceStressTestSuite extends 
FileStreamSourceTest with SharedSQ
     val src = Utils.createTempDir(namePrefix = "streaming.src")
     val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
 
-    val textSource = createFileStreamSource("text", src.getCanonicalPath)
-    val ds = textSource.toDS[String]().map(_.toInt + 1)
+    val fileStream = createFileStream("text", src.getCanonicalPath)
+    val ds = fileStream.as[String].map(_.toInt + 1)
     runStressTest(ds, data => {
-      AddTextFileData(textSource, data.mkString("\n"), src, tmp)
+      AddTextFileData(data.mkString("\n"), src, tmp)
     })
 
     Utils.deleteRecursively(src)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to