Repository: spark Updated Branches: refs/heads/branch-2.4 4c1428fa2 -> 15d2e9d7d
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index aeef4c8..52b833a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -17,74 +17,73 @@ package org.apache.spark.sql.streaming.sources +import java.util.Optional + import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, StreamingQueryWrapper} import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} -import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, ScanConfig, ScanConfigBuilder} -import org.apache.spark.sql.sources.v2.reader.streaming._ -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.reader.InputPartition +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger} import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils -case class FakeReadSupport() extends MicroBatchReadSupport with ContinuousReadSupport { - override def deserializeOffset(json: String): Offset = RateStreamOffset(Map()) - override def commit(end: Offset): Unit = {} - override def stop(): Unit = {} - override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map()) - override def fullSchema(): StructType = StructType(Seq()) - override def newScanConfigBuilder(start: Offset, end: Offset): ScanConfigBuilder = null - override def initialOffset(): Offset = RateStreamOffset(Map()) - override def latestOffset(): Offset = RateStreamOffset(Map()) - override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = null - override def createReaderFactory(config: ScanConfig): PartitionReaderFactory = { - throw new IllegalStateException("fake source - cannot actually read") - } - override def createContinuousReaderFactory( - config: ScanConfig): ContinuousPartitionReaderFactory = { - throw new IllegalStateException("fake source - cannot actually read") - } - override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { +case class FakeReader() extends MicroBatchReader with ContinuousReader { + def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {} + def getStartOffset: Offset = RateStreamOffset(Map()) + def getEndOffset: Offset = RateStreamOffset(Map()) + def deserializeOffset(json: String): Offset = RateStreamOffset(Map()) + def commit(end: Offset): Unit = {} + def readSchema(): StructType = StructType(Seq()) + def stop(): Unit = {} + def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map()) + def setStartOffset(start: Optional[Offset]): Unit = {} + + def planInputPartitions(): java.util.ArrayList[InputPartition[InternalRow]] = { throw new IllegalStateException("fake source - cannot actually read") } } -trait FakeMicroBatchReadSupportProvider extends MicroBatchReadSupportProvider { - override def createMicroBatchReadSupport( +trait FakeMicroBatchReadSupport extends MicroBatchReadSupport { + override def createMicroBatchReader( + schema: Optional[StructType], checkpointLocation: String, - options: DataSourceOptions): MicroBatchReadSupport = FakeReadSupport() + options: DataSourceOptions): MicroBatchReader = FakeReader() } -trait FakeContinuousReadSupportProvider extends ContinuousReadSupportProvider { - override def createContinuousReadSupport( +trait FakeContinuousReadSupport extends ContinuousReadSupport { + override def createContinuousReader( + schema: Optional[StructType], checkpointLocation: String, - options: DataSourceOptions): ContinuousReadSupport = FakeReadSupport() + options: DataSourceOptions): ContinuousReader = FakeReader() } -trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider { - override def createStreamingWriteSupport( +trait FakeStreamWriteSupport extends StreamWriteSupport { + override def createStreamWriter( queryId: String, schema: StructType, mode: OutputMode, - options: DataSourceOptions): StreamingWriteSupport = { + options: DataSourceOptions): StreamWriter = { throw new IllegalStateException("fake sink - cannot actually write") } } -class FakeReadMicroBatchOnly extends DataSourceRegister with FakeMicroBatchReadSupportProvider { +class FakeReadMicroBatchOnly extends DataSourceRegister with FakeMicroBatchReadSupport { override def shortName(): String = "fake-read-microbatch-only" } -class FakeReadContinuousOnly extends DataSourceRegister with FakeContinuousReadSupportProvider { +class FakeReadContinuousOnly extends DataSourceRegister with FakeContinuousReadSupport { override def shortName(): String = "fake-read-continuous-only" } class FakeReadBothModes extends DataSourceRegister - with FakeMicroBatchReadSupportProvider with FakeContinuousReadSupportProvider { + with FakeMicroBatchReadSupport with FakeContinuousReadSupport { override def shortName(): String = "fake-read-microbatch-continuous" } @@ -92,7 +91,7 @@ class FakeReadNeitherMode extends DataSourceRegister { override def shortName(): String = "fake-read-neither-mode" } -class FakeWriteSupportProvider extends DataSourceRegister with FakeStreamingWriteSupportProvider { +class FakeWrite extends DataSourceRegister with FakeStreamWriteSupport { override def shortName(): String = "fake-write-microbatch-continuous" } @@ -107,8 +106,8 @@ class FakeSink extends Sink { override def addBatch(batchId: Long, data: DataFrame): Unit = {} } -class FakeWriteSupportProviderV1Fallback extends DataSourceRegister - with FakeStreamingWriteSupportProvider with StreamSinkProvider { +class FakeWriteV1Fallback extends DataSourceRegister + with FakeStreamWriteSupport with StreamSinkProvider { override def createSink( sqlContext: SQLContext, @@ -191,11 +190,11 @@ class StreamingDataSourceV2Suite extends StreamTest { val v2Query = testPositiveCase( "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink - .isInstanceOf[FakeWriteSupportProviderV1Fallback]) + .isInstanceOf[FakeWriteV1Fallback]) // Ensure we create a V1 sink with the config. Note the config is a comma separated // list, including other fake entries. - val fullSinkName = classOf[FakeWriteSupportProviderV1Fallback].getName + val fullSinkName = "org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback" withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> s"a,b,c,test,$fullSinkName,d,e") { val v1Query = testPositiveCase( "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) @@ -219,37 +218,35 @@ class StreamingDataSourceV2Suite extends StreamTest { val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf).newInstance() (readSource, writeSource, trigger) match { // Valid microbatch queries. - case (_: MicroBatchReadSupportProvider, _: StreamingWriteSupportProvider, t) + case (_: MicroBatchReadSupport, _: StreamWriteSupport, t) if !t.isInstanceOf[ContinuousTrigger] => testPositiveCase(read, write, trigger) // Valid continuous queries. - case (_: ContinuousReadSupportProvider, _: StreamingWriteSupportProvider, - _: ContinuousTrigger) => + case (_: ContinuousReadSupport, _: StreamWriteSupport, _: ContinuousTrigger) => testPositiveCase(read, write, trigger) // Invalid - can't read at all case (r, _, _) - if !r.isInstanceOf[MicroBatchReadSupportProvider] - && !r.isInstanceOf[ContinuousReadSupportProvider] => + if !r.isInstanceOf[MicroBatchReadSupport] + && !r.isInstanceOf[ContinuousReadSupport] => testNegativeCase(read, write, trigger, s"Data source $read does not support streamed reading") // Invalid - can't write - case (_, w, _) if !w.isInstanceOf[StreamingWriteSupportProvider] => + case (_, w, _) if !w.isInstanceOf[StreamWriteSupport] => testNegativeCase(read, write, trigger, s"Data source $write does not support streamed writing") // Invalid - trigger is continuous but reader is not - case (r, _: StreamingWriteSupportProvider, _: ContinuousTrigger) - if !r.isInstanceOf[ContinuousReadSupportProvider] => + case (r, _: StreamWriteSupport, _: ContinuousTrigger) + if !r.isInstanceOf[ContinuousReadSupport] => testNegativeCase(read, write, trigger, s"Data source $read does not support continuous processing") // Invalid - trigger is microbatch but reader is not case (r, _, t) - if !r.isInstanceOf[MicroBatchReadSupportProvider] && - !t.isInstanceOf[ContinuousTrigger] => + if !r.isInstanceOf[MicroBatchReadSupport] && !t.isInstanceOf[ContinuousTrigger] => testPostCreationNegativeCase(read, write, trigger, s"Data source $read does not support microbatch processing") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org