Repository: spark
Updated Branches:
  refs/heads/master 9341c951e -> 62d01391f


http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
index 048d078..80eeffd 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaSchemaRequiredDataSource implements DataSourceV2, 
ReadSupportWithSchema {
@@ -42,7 +42,7 @@ public class JavaSchemaRequiredDataSource implements 
DataSourceV2, ReadSupportWi
     }
 
     @Override
-    public List<DataReaderFactory<Row>> createDataReaderFactories() {
+    public List<InputPartition<Row>> planInputPartitions() {
       return java.util.Collections.emptyList();
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
index 96f55b8..8522a63 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.types.StructType;
 
@@ -41,25 +41,25 @@ public class JavaSimpleDataSourceV2 implements 
DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<Row>> createDataReaderFactories() {
+    public List<InputPartition<Row>> planInputPartitions() {
       return java.util.Arrays.asList(
-        new JavaSimpleDataReaderFactory(0, 5),
-        new JavaSimpleDataReaderFactory(5, 10));
+        new JavaSimpleInputPartition(0, 5),
+        new JavaSimpleInputPartition(5, 10));
     }
   }
 
-  static class JavaSimpleDataReaderFactory implements DataReaderFactory<Row>, 
DataReader<Row> {
+  static class JavaSimpleInputPartition implements InputPartition<Row>, 
InputPartitionReader<Row> {
     private int start;
     private int end;
 
-    JavaSimpleDataReaderFactory(int start, int end) {
+    JavaSimpleInputPartition(int start, int end) {
       this.start = start;
       this.end = end;
     }
 
     @Override
-    public DataReader<Row> createDataReader() {
-      return new JavaSimpleDataReaderFactory(start - 1, end);
+    public InputPartitionReader<Row> createPartitionReader() {
+      return new JavaSimpleInputPartition(start - 1, end);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
index c3916e0..3ad8e7a 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
@@ -38,20 +38,20 @@ public class JavaUnsafeRowDataSourceV2 implements 
DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories() 
{
+    public List<InputPartition<UnsafeRow>> planUnsafeInputPartitions() {
       return java.util.Arrays.asList(
-        new JavaUnsafeRowDataReaderFactory(0, 5),
-        new JavaUnsafeRowDataReaderFactory(5, 10));
+        new JavaUnsafeRowInputPartition(0, 5),
+        new JavaUnsafeRowInputPartition(5, 10));
     }
   }
 
-  static class JavaUnsafeRowDataReaderFactory
-      implements DataReaderFactory<UnsafeRow>, DataReader<UnsafeRow> {
+  static class JavaUnsafeRowInputPartition
+      implements InputPartition<UnsafeRow>, InputPartitionReader<UnsafeRow> {
     private int start;
     private int end;
     private UnsafeRow row;
 
-    JavaUnsafeRowDataReaderFactory(int start, int end) {
+    JavaUnsafeRowInputPartition(int start, int end) {
       this.start = start;
       this.end = end;
       this.row = new UnsafeRow(2);
@@ -59,8 +59,8 @@ public class JavaUnsafeRowDataSourceV2 implements 
DataSourceV2, ReadSupport {
     }
 
     @Override
-    public DataReader<UnsafeRow> createDataReader() {
-      return new JavaUnsafeRowDataReaderFactory(start - 1, end);
+    public InputPartitionReader<UnsafeRow> createPartitionReader() {
+      return new JavaUnsafeRowInputPartition(start - 1, end);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index ff14ec3..39a010f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -142,9 +142,9 @@ class RateSourceSuite extends StreamTest {
     val startOffset = LongOffset(0L)
     val endOffset = LongOffset(1L)
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    val tasks = reader.createDataReaderFactories()
+    val tasks = reader.planInputPartitions()
     assert(tasks.size == 1)
-    val dataReader = tasks.get(0).createDataReader()
+    val dataReader = tasks.get(0).createPartitionReader()
     val data = ArrayBuffer[Row]()
     while (dataReader.next()) {
       data.append(dataReader.get())
@@ -159,11 +159,11 @@ class RateSourceSuite extends StreamTest {
     val startOffset = LongOffset(0L)
     val endOffset = LongOffset(1L)
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    val tasks = reader.createDataReaderFactories()
+    val tasks = reader.planInputPartitions()
     assert(tasks.size == 11)
 
     val readData = tasks.asScala
-      .map(_.createDataReader())
+      .map(_.createPartitionReader())
       .flatMap { reader =>
         val buf = scala.collection.mutable.ListBuffer[Row]()
         while (reader.next()) buf.append(reader.get())
@@ -304,7 +304,7 @@ class RateSourceSuite extends StreamTest {
     val reader = new RateStreamContinuousReader(
       new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> 
"20").asJava))
     reader.setStartOffset(Optional.empty())
-    val tasks = reader.createDataReaderFactories()
+    val tasks = reader.planInputPartitions()
     assert(tasks.size == 2)
 
     val data = scala.collection.mutable.ListBuffer[Row]()
@@ -314,7 +314,7 @@ class RateSourceSuite extends StreamTest {
           .asInstanceOf[RateStreamOffset]
           .partitionToValueAndRunTimeMs(t.partitionIndex)
           .runTimeMs
-        val r = 
t.createDataReader().asInstanceOf[RateStreamContinuousDataReader]
+        val r = 
t.createPartitionReader().asInstanceOf[RateStreamContinuousInputPartitionReader]
         for (rowIndex <- 0 to 9) {
           r.next()
           data.append(r.get())

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index e0a5327..505a3f3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -346,8 +346,8 @@ class SimpleSinglePartitionSource extends DataSourceV2 with 
ReadSupport {
   class Reader extends DataSourceReader {
     override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
-      java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5))
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
+      java.util.Arrays.asList(new SimpleInputPartition(0, 5))
     }
   }
 
@@ -359,20 +359,21 @@ class SimpleDataSourceV2 extends DataSourceV2 with 
ReadSupport {
   class Reader extends DataSourceReader {
     override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
-      java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new 
SimpleDataReaderFactory(5, 10))
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
+      java.util.Arrays.asList(new SimpleInputPartition(0, 5), new 
SimpleInputPartition(5, 10))
     }
   }
 
   override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
 }
 
-class SimpleDataReaderFactory(start: Int, end: Int)
-  extends DataReaderFactory[Row]
-  with DataReader[Row] {
+class SimpleInputPartition(start: Int, end: Int)
+  extends InputPartition[Row]
+  with InputPartitionReader[Row] {
   private var current = start - 1
 
-  override def createDataReader(): DataReader[Row] = new 
SimpleDataReaderFactory(start, end)
+  override def createPartitionReader(): InputPartitionReader[Row] =
+    new SimpleInputPartition(start, end)
 
   override def next(): Boolean = {
     current += 1
@@ -413,21 +414,21 @@ class AdvancedDataSourceV2 extends DataSourceV2 with 
ReadSupport {
       requiredSchema
     }
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
       val lowerBound = filters.collect {
         case GreaterThan("i", v: Int) => v
       }.headOption
 
-      val res = new ArrayList[DataReaderFactory[Row]]
+      val res = new ArrayList[InputPartition[Row]]
 
       if (lowerBound.isEmpty) {
-        res.add(new AdvancedDataReaderFactory(0, 5, requiredSchema))
-        res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
+        res.add(new AdvancedInputPartition(0, 5, requiredSchema))
+        res.add(new AdvancedInputPartition(5, 10, requiredSchema))
       } else if (lowerBound.get < 4) {
-        res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 5, 
requiredSchema))
-        res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
+        res.add(new AdvancedInputPartition(lowerBound.get + 1, 5, 
requiredSchema))
+        res.add(new AdvancedInputPartition(5, 10, requiredSchema))
       } else if (lowerBound.get < 9) {
-        res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 10, 
requiredSchema))
+        res.add(new AdvancedInputPartition(lowerBound.get + 1, 10, 
requiredSchema))
       }
 
       res
@@ -437,13 +438,13 @@ class AdvancedDataSourceV2 extends DataSourceV2 with 
ReadSupport {
   override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
 }
 
-class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: 
StructType)
-  extends DataReaderFactory[Row] with DataReader[Row] {
+class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType)
+  extends InputPartition[Row] with InputPartitionReader[Row] {
 
   private var current = start - 1
 
-  override def createDataReader(): DataReader[Row] = {
-    new AdvancedDataReaderFactory(start, end, requiredSchema)
+  override def createPartitionReader(): InputPartitionReader[Row] = {
+    new AdvancedInputPartition(start, end, requiredSchema)
   }
 
   override def close(): Unit = {}
@@ -468,24 +469,24 @@ class UnsafeRowDataSourceV2 extends DataSourceV2 with 
ReadSupport {
   class Reader extends DataSourceReader with SupportsScanUnsafeRow {
     override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
 
-    override def createUnsafeRowReaderFactories(): 
JList[DataReaderFactory[UnsafeRow]] = {
-      java.util.Arrays.asList(new UnsafeRowDataReaderFactory(0, 5),
-        new UnsafeRowDataReaderFactory(5, 10))
+    override def planUnsafeInputPartitions(): JList[InputPartition[UnsafeRow]] 
= {
+      java.util.Arrays.asList(new UnsafeRowInputPartitionReader(0, 5),
+        new UnsafeRowInputPartitionReader(5, 10))
     }
   }
 
   override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
 }
 
-class UnsafeRowDataReaderFactory(start: Int, end: Int)
-  extends DataReaderFactory[UnsafeRow] with DataReader[UnsafeRow] {
+class UnsafeRowInputPartitionReader(start: Int, end: Int)
+  extends InputPartition[UnsafeRow] with InputPartitionReader[UnsafeRow] {
 
   private val row = new UnsafeRow(2)
   row.pointTo(new Array[Byte](8 * 3), 8 * 3)
 
   private var current = start - 1
 
-  override def createDataReader(): DataReader[UnsafeRow] = this
+  override def createPartitionReader(): InputPartitionReader[UnsafeRow] = this
 
   override def next(): Boolean = {
     current += 1
@@ -503,7 +504,7 @@ class UnsafeRowDataReaderFactory(start: Int, end: Int)
 class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema 
{
 
   class Reader(val readSchema: StructType) extends DataSourceReader {
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] =
+    override def planInputPartitions(): JList[InputPartition[Row]] =
       java.util.Collections.emptyList()
   }
 
@@ -516,16 +517,17 @@ class BatchDataSourceV2 extends DataSourceV2 with 
ReadSupport {
   class Reader extends DataSourceReader with SupportsScanColumnarBatch {
     override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
 
-    override def createBatchDataReaderFactories(): 
JList[DataReaderFactory[ColumnarBatch]] = {
-      java.util.Arrays.asList(new BatchDataReaderFactory(0, 50), new 
BatchDataReaderFactory(50, 90))
+    override def planBatchInputPartitions(): 
JList[InputPartition[ColumnarBatch]] = {
+      java.util.Arrays.asList(
+        new BatchInputPartitionReader(0, 50), new 
BatchInputPartitionReader(50, 90))
     }
   }
 
   override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
 }
 
-class BatchDataReaderFactory(start: Int, end: Int)
-  extends DataReaderFactory[ColumnarBatch] with DataReader[ColumnarBatch] {
+class BatchInputPartitionReader(start: Int, end: Int)
+  extends InputPartition[ColumnarBatch] with 
InputPartitionReader[ColumnarBatch] {
 
   private final val BATCH_SIZE = 20
   private lazy val i = new OnHeapColumnVector(BATCH_SIZE, IntegerType)
@@ -534,7 +536,7 @@ class BatchDataReaderFactory(start: Int, end: Int)
 
   private var current = start
 
-  override def createDataReader(): DataReader[ColumnarBatch] = this
+  override def createPartitionReader(): InputPartitionReader[ColumnarBatch] = 
this
 
   override def next(): Boolean = {
     i.reset()
@@ -568,11 +570,11 @@ class PartitionAwareDataSource extends DataSourceV2 with 
ReadSupport {
   class Reader extends DataSourceReader with SupportsReportPartitioning {
     override def readSchema(): StructType = new StructType().add("a", 
"int").add("b", "int")
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
       // Note that we don't have same value of column `a` across partitions.
       java.util.Arrays.asList(
-        new SpecificDataReaderFactory(Array(1, 1, 3), Array(4, 4, 6)),
-        new SpecificDataReaderFactory(Array(2, 4, 4), Array(6, 2, 2)))
+        new SpecificInputPartitionReader(Array(1, 1, 3), Array(4, 4, 6)),
+        new SpecificInputPartitionReader(Array(2, 4, 4), Array(6, 2, 2)))
     }
 
     override def outputPartitioning(): Partitioning = new MyPartitioning
@@ -590,14 +592,14 @@ class PartitionAwareDataSource extends DataSourceV2 with 
ReadSupport {
   override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
 }
 
-class SpecificDataReaderFactory(i: Array[Int], j: Array[Int])
-  extends DataReaderFactory[Row]
-  with DataReader[Row] {
+class SpecificInputPartitionReader(i: Array[Int], j: Array[Int])
+  extends InputPartition[Row]
+  with InputPartitionReader[Row] {
   assert(i.length == j.length)
 
   private var current = -1
 
-  override def createDataReader(): DataReader[Row] = this
+  override def createPartitionReader(): InputPartitionReader[Row] = this
 
   override def next(): Boolean = {
     current += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index a5007fa..694bb3b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, 
Path}
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, 
DataSourceReader}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -45,7 +45,7 @@ class SimpleWritableDataSource extends DataSourceV2 with 
ReadSupport with WriteS
   class Reader(path: String, conf: Configuration) extends DataSourceReader {
     override def readSchema(): StructType = schema
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
       val dataPath = new Path(path)
       val fs = dataPath.getFileSystem(conf)
       if (fs.exists(dataPath)) {
@@ -54,9 +54,9 @@ class SimpleWritableDataSource extends DataSourceV2 with 
ReadSupport with WriteS
           name.startsWith("_") || name.startsWith(".")
         }.map { f =>
           val serializableConf = new SerializableConfiguration(conf)
-          new SimpleCSVDataReaderFactory(
+          new SimpleCSVInputPartitionReader(
             f.getPath.toUri.toString,
-            serializableConf): DataReaderFactory[Row]
+            serializableConf): InputPartition[Row]
         }.toList.asJava
       } else {
         Collections.emptyList()
@@ -156,14 +156,14 @@ class SimpleWritableDataSource extends DataSourceV2 with 
ReadSupport with WriteS
   }
 }
 
-class SimpleCSVDataReaderFactory(path: String, conf: SerializableConfiguration)
-  extends DataReaderFactory[Row] with DataReader[Row] {
+class SimpleCSVInputPartitionReader(path: String, conf: 
SerializableConfiguration)
+  extends InputPartition[Row] with InputPartitionReader[Row] {
 
   @transient private var lines: Iterator[String] = _
   @transient private var currentLine: String = _
   @transient private var inputStream: FSDataInputStream = _
 
-  override def createDataReader(): DataReader[Row] = {
+  override def createPartitionReader(): InputPartitionReader[Row] = {
     val filePath = new Path(path)
     val fs = filePath.getFileSystem(conf.value)
     inputStream = fs.open(filePath)

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 5798699..dcf6cb5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
 import org.apache.spark.sql.streaming.util.{BlockingSource, 
MockSourceProvider, StreamManualClock}
 import org.apache.spark.sql.types.StructType
@@ -227,10 +227,10 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
       }
 
       // getBatch should take 100 ms the first time it is called
-      override def createUnsafeRowReaderFactories(): 
ju.List[DataReaderFactory[UnsafeRow]] = {
+      override def planUnsafeInputPartitions(): 
ju.List[InputPartition[UnsafeRow]] = {
         synchronized {
           clock.waitTillTime(1350)
-          super.createUnsafeRowReaderFactories()
+          super.planUnsafeInputPartitions()
         }
       }
     }
@@ -290,13 +290,14 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 
       AdvanceManualClock(100), // time = 1150 to unblock getEndOffset
       AssertClockTime(1150),
-      AssertStreamExecThreadIsWaitingForTime(1350), // will block on 
createReadTasks that needs 1350
+      // will block on planInputPartitions that needs 1350
+      AssertStreamExecThreadIsWaitingForTime(1350),
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message === "Processing new data"),
       AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
 
-      AdvanceManualClock(200), // time = 1350 to unblock createReadTasks
+      AdvanceManualClock(200), // time = 1350 to unblock planInputPartitions
       AssertClockTime(1350),
       AssertStreamExecThreadIsWaitingForTime(1500), // will block on map task 
that needs 1500
       AssertOnQuery(_.status.isDataAvailable === true),

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
index e755625..f47d3ec 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
@@ -27,8 +27,8 @@ import org.apache.spark.{SparkEnv, SparkFunSuite, TaskContext}
 import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.types.{DataType, IntegerType}
@@ -72,8 +72,8 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with 
MockitoSugar {
    */
   private def setup(): (BlockingQueue[UnsafeRow], ContinuousQueuedDataReader) 
= {
     val queue = new ArrayBlockingQueue[UnsafeRow](1024)
-    val factory = new DataReaderFactory[UnsafeRow] {
-      override def createDataReader() = new ContinuousDataReader[UnsafeRow] {
+    val factory = new InputPartition[UnsafeRow] {
+      override def createPartitionReader() = new 
ContinuousInputPartitionReader[UnsafeRow] {
         var index = -1
         var curr: UnsafeRow = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/62d01391/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index af4618b..c1a28b9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -26,7 +26,7 @@ import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader, Offset, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
@@ -44,7 +44,7 @@ case class FakeReader() extends MicroBatchReader with 
ContinuousReader {
   def mergeOffsets(offsets: Array[PartitionOffset]): Offset = 
RateStreamOffset(Map())
   def setStartOffset(start: Optional[Offset]): Unit = {}
 
-  def createDataReaderFactories(): java.util.ArrayList[DataReaderFactory[Row]] 
= {
+  def planInputPartitions(): java.util.ArrayList[InputPartition[Row]] = {
     throw new IllegalStateException("fake source - cannot actually read")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to