Repository: spark
Updated Branches:
  refs/heads/branch-2.4 4c1428fa2 -> 15d2e9d7d


http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index aeef4c8..52b833a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -17,74 +17,73 @@
 
 package org.apache.spark.sql.streaming.sources
 
+import java.util.Optional
+
 import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, 
StreamingQueryWrapper}
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
-import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
PartitionReaderFactory, ScanConfig, ScanConfigBuilder}
-import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
-case class FakeReadSupport() extends MicroBatchReadSupport with 
ContinuousReadSupport {
-  override def deserializeOffset(json: String): Offset = 
RateStreamOffset(Map())
-  override def commit(end: Offset): Unit = {}
-  override def stop(): Unit = {}
-  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = 
RateStreamOffset(Map())
-  override def fullSchema(): StructType = StructType(Seq())
-  override def newScanConfigBuilder(start: Offset, end: Offset): 
ScanConfigBuilder = null
-  override def initialOffset(): Offset = RateStreamOffset(Map())
-  override def latestOffset(): Offset = RateStreamOffset(Map())
-  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = null
-  override def createReaderFactory(config: ScanConfig): PartitionReaderFactory 
= {
-    throw new IllegalStateException("fake source - cannot actually read")
-  }
-  override def createContinuousReaderFactory(
-      config: ScanConfig): ContinuousPartitionReaderFactory = {
-    throw new IllegalStateException("fake source - cannot actually read")
-  }
-  override def planInputPartitions(config: ScanConfig): Array[InputPartition] 
= {
+case class FakeReader() extends MicroBatchReader with ContinuousReader {
+  def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {}
+  def getStartOffset: Offset = RateStreamOffset(Map())
+  def getEndOffset: Offset = RateStreamOffset(Map())
+  def deserializeOffset(json: String): Offset = RateStreamOffset(Map())
+  def commit(end: Offset): Unit = {}
+  def readSchema(): StructType = StructType(Seq())
+  def stop(): Unit = {}
+  def mergeOffsets(offsets: Array[PartitionOffset]): Offset = 
RateStreamOffset(Map())
+  def setStartOffset(start: Optional[Offset]): Unit = {}
+
+  def planInputPartitions(): java.util.ArrayList[InputPartition[InternalRow]] 
= {
     throw new IllegalStateException("fake source - cannot actually read")
   }
 }
 
-trait FakeMicroBatchReadSupportProvider extends MicroBatchReadSupportProvider {
-  override def createMicroBatchReadSupport(
+trait FakeMicroBatchReadSupport extends MicroBatchReadSupport {
+  override def createMicroBatchReader(
+      schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceOptions): MicroBatchReadSupport = FakeReadSupport()
+      options: DataSourceOptions): MicroBatchReader = FakeReader()
 }
 
-trait FakeContinuousReadSupportProvider extends ContinuousReadSupportProvider {
-  override def createContinuousReadSupport(
+trait FakeContinuousReadSupport extends ContinuousReadSupport {
+  override def createContinuousReader(
+      schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceOptions): ContinuousReadSupport = FakeReadSupport()
+      options: DataSourceOptions): ContinuousReader = FakeReader()
 }
 
-trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider {
-  override def createStreamingWriteSupport(
+trait FakeStreamWriteSupport extends StreamWriteSupport {
+  override def createStreamWriter(
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceOptions): StreamingWriteSupport = {
+      options: DataSourceOptions): StreamWriter = {
     throw new IllegalStateException("fake sink - cannot actually write")
   }
 }
 
-class FakeReadMicroBatchOnly extends DataSourceRegister with 
FakeMicroBatchReadSupportProvider {
+class FakeReadMicroBatchOnly extends DataSourceRegister with 
FakeMicroBatchReadSupport {
   override def shortName(): String = "fake-read-microbatch-only"
 }
 
-class FakeReadContinuousOnly extends DataSourceRegister with 
FakeContinuousReadSupportProvider {
+class FakeReadContinuousOnly extends DataSourceRegister with 
FakeContinuousReadSupport {
   override def shortName(): String = "fake-read-continuous-only"
 }
 
 class FakeReadBothModes extends DataSourceRegister
-    with FakeMicroBatchReadSupportProvider with 
FakeContinuousReadSupportProvider {
+    with FakeMicroBatchReadSupport with FakeContinuousReadSupport {
   override def shortName(): String = "fake-read-microbatch-continuous"
 }
 
@@ -92,7 +91,7 @@ class FakeReadNeitherMode extends DataSourceRegister {
   override def shortName(): String = "fake-read-neither-mode"
 }
 
-class FakeWriteSupportProvider extends DataSourceRegister with 
FakeStreamingWriteSupportProvider {
+class FakeWrite extends DataSourceRegister with FakeStreamWriteSupport {
   override def shortName(): String = "fake-write-microbatch-continuous"
 }
 
@@ -107,8 +106,8 @@ class FakeSink extends Sink {
   override def addBatch(batchId: Long, data: DataFrame): Unit = {}
 }
 
-class FakeWriteSupportProviderV1Fallback extends DataSourceRegister
-  with FakeStreamingWriteSupportProvider with StreamSinkProvider {
+class FakeWriteV1Fallback extends DataSourceRegister
+  with FakeStreamWriteSupport with StreamSinkProvider {
 
   override def createSink(
     sqlContext: SQLContext,
@@ -191,11 +190,11 @@ class StreamingDataSourceV2Suite extends StreamTest {
     val v2Query = testPositiveCase(
       "fake-read-microbatch-continuous", "fake-write-v1-fallback", 
Trigger.Once())
     assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
-      .isInstanceOf[FakeWriteSupportProviderV1Fallback])
+      .isInstanceOf[FakeWriteV1Fallback])
 
     // Ensure we create a V1 sink with the config. Note the config is a comma 
separated
     // list, including other fake entries.
-    val fullSinkName = classOf[FakeWriteSupportProviderV1Fallback].getName
+    val fullSinkName = 
"org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback"
     withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> 
s"a,b,c,test,$fullSinkName,d,e") {
       val v1Query = testPositiveCase(
         "fake-read-microbatch-continuous", "fake-write-v1-fallback", 
Trigger.Once())
@@ -219,37 +218,35 @@ class StreamingDataSourceV2Suite extends StreamTest {
       val writeSource = DataSource.lookupDataSource(write, 
spark.sqlContext.conf).newInstance()
       (readSource, writeSource, trigger) match {
         // Valid microbatch queries.
-        case (_: MicroBatchReadSupportProvider, _: 
StreamingWriteSupportProvider, t)
+        case (_: MicroBatchReadSupport, _: StreamWriteSupport, t)
           if !t.isInstanceOf[ContinuousTrigger] =>
           testPositiveCase(read, write, trigger)
 
         // Valid continuous queries.
-        case (_: ContinuousReadSupportProvider, _: 
StreamingWriteSupportProvider,
-              _: ContinuousTrigger) =>
+        case (_: ContinuousReadSupport, _: StreamWriteSupport, _: 
ContinuousTrigger) =>
           testPositiveCase(read, write, trigger)
 
         // Invalid - can't read at all
         case (r, _, _)
-            if !r.isInstanceOf[MicroBatchReadSupportProvider]
-              && !r.isInstanceOf[ContinuousReadSupportProvider] =>
+            if !r.isInstanceOf[MicroBatchReadSupport]
+              && !r.isInstanceOf[ContinuousReadSupport] =>
           testNegativeCase(read, write, trigger,
             s"Data source $read does not support streamed reading")
 
         // Invalid - can't write
-        case (_, w, _) if !w.isInstanceOf[StreamingWriteSupportProvider] =>
+        case (_, w, _) if !w.isInstanceOf[StreamWriteSupport] =>
           testNegativeCase(read, write, trigger,
             s"Data source $write does not support streamed writing")
 
         // Invalid - trigger is continuous but reader is not
-        case (r, _: StreamingWriteSupportProvider, _: ContinuousTrigger)
-            if !r.isInstanceOf[ContinuousReadSupportProvider] =>
+        case (r, _: StreamWriteSupport, _: ContinuousTrigger)
+            if !r.isInstanceOf[ContinuousReadSupport] =>
           testNegativeCase(read, write, trigger,
             s"Data source $read does not support continuous processing")
 
         // Invalid - trigger is microbatch but reader is not
         case (r, _, t)
-           if !r.isInstanceOf[MicroBatchReadSupportProvider] &&
-             !t.isInstanceOf[ContinuousTrigger] =>
+           if !r.isInstanceOf[MicroBatchReadSupport] && 
!t.isInstanceOf[ContinuousTrigger] =>
           testPostCreationNegativeCase(read, write, trigger,
             s"Data source $read does not support microbatch processing")
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to