This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 02533d71806e [SPARK-46777][SS] Refactor 
`StreamingDataSourceV2Relation` catalyst structure to be more on-par with the 
batch version
02533d71806e is described below

commit 02533d71806ec0be97ec793d680189093c9a0ecb
Author: jackierwzhang <ruowang.zh...@databricks.com>
AuthorDate: Mon Jan 22 18:58:55 2024 +0900

    [SPARK-46777][SS] Refactor `StreamingDataSourceV2Relation` catalyst 
structure to be more on-par with the batch version
    
    ### What changes were proposed in this pull request?
    This PR refactors `StreamingDataSourceV2Relation` into 
`StreamingDataSourceV2Relation` and `StreamingDataSourceV2ScanRelation` to 
achieve better parity with the batch version. This prepares the codebase to be 
able to extend certain V2 optimization rules (e.g. `V2ScanRelationPushDown`) to 
be applied to streaming in the future.
    
    ### Why are the changes needed?
    As described above, we would like to start reuse certain V2 batch 
optimization rules to apply to streaming relations.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    This is a pure refactoring, existing tests should be sufficient.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44818 from jackierwzhang/spark-46777.
    
    Authored-by: jackierwzhang <ruowang.zh...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  7 +-
 .../catalyst/streaming/StreamingRelationV2.scala   |  4 +-
 .../datasources/v2/DataSourceV2Relation.scala      | 83 ++++++++++++++++------
 .../datasources/v2/DataSourceV2Strategy.scala      |  4 +-
 .../execution/streaming/MicroBatchExecution.scala  | 12 ++--
 .../sql/execution/streaming/ProgressReporter.scala |  4 +-
 .../streaming/continuous/ContinuousExecution.scala | 14 ++--
 .../sources/RateStreamProviderSuite.scala          |  4 +-
 .../streaming/sources/TextSocketStreamSuite.scala  |  4 +-
 .../apache/spark/sql/streaming/StreamSuite.scala   |  8 +--
 .../apache/spark/sql/streaming/StreamTest.scala    |  4 +-
 .../sql/streaming/StreamingQueryManagerSuite.scala |  4 +-
 .../streaming/test/DataStreamTableAPISuite.scala   |  2 +-
 13 files changed, 99 insertions(+), 55 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index cee0d9a3dd72..fb5e71a1e7b8 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.TestUtils
 import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
-import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.streaming._
 import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED}
@@ -125,7 +125,8 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSparkSession with K
       val sources: Seq[SparkDataStream] = {
         query.get.logicalPlan.collect {
           case StreamingExecutionRelation(source: KafkaSource, _, _) => source
-          case r: StreamingDataSourceV2Relation if 
r.stream.isInstanceOf[KafkaMicroBatchStream] ||
+          case r: StreamingDataSourceV2ScanRelation
+            if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
               r.stream.isInstanceOf[KafkaContinuousStream] =>
             r.stream
         }
@@ -1654,7 +1655,7 @@ class KafkaMicroBatchV2SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
       makeSureGetOffsetCalled,
       AssertOnQuery { query =>
         query.logicalPlan.exists {
-          case r: StreamingDataSourceV2Relation => 
r.stream.isInstanceOf[KafkaMicroBatchStream]
+          case r: StreamingDataSourceV2ScanRelation => 
r.stream.isInstanceOf[KafkaMicroBatchStream]
           case _ => false
         }
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
index ab0352b606e5..c1d7daa6cfcf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.streaming
 
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, 
LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, 
SupportsMetadataColumns, Table, TableProvider}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits
@@ -36,7 +36,7 @@ case class StreamingRelationV2(
     sourceName: String,
     table: Table,
     extraOptions: CaseInsensitiveStringMap,
-    output: Seq[Attribute],
+    output: Seq[AttributeReference],
     catalog: Option[CatalogPlugin],
     identifier: Option[Identifier],
     v1Relation: Option[LogicalPlan])
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 9c7d776edc65..556283243f63 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -32,21 +32,21 @@ import org.apache.spark.util.Utils
 /**
  * A logical plan representing a data source v2 table.
  *
- * @param table   The table that this relation represents.
- * @param output the output attributes of this relation.
+ * @param table  The table that this relation represents.
+ * @param output The output attributes of this relation.
  * @param catalog catalogPlugin for the table. None if no catalog is specified.
- * @param identifier the identifier for the table. None if no identifier is 
defined.
+ * @param identifier The identifier for the table. None if no identifier is 
defined.
  * @param options The options for this table operation. It's used to create 
fresh
  *                [[org.apache.spark.sql.connector.read.ScanBuilder]] and
  *                [[org.apache.spark.sql.connector.write.WriteBuilder]].
  */
-case class DataSourceV2Relation(
+abstract class DataSourceV2RelationBase(
     table: Table,
     output: Seq[AttributeReference],
     catalog: Option[CatalogPlugin],
     identifier: Option[Identifier],
     options: CaseInsensitiveStringMap)
-  extends LeafNode with MultiInstanceRelation with NamedRelation with 
ExposesMetadataColumns {
+  extends LeafNode with MultiInstanceRelation with NamedRelation {
 
   import DataSourceV2Implicits._
 
@@ -54,14 +54,6 @@ case class DataSourceV2Relation(
     case c: FunctionCatalog => c
   }
 
-  override lazy val metadataOutput: Seq[AttributeReference] = table match {
-    case hasMeta: SupportsMetadataColumns =>
-      metadataOutputWithOutConflicts(
-        hasMeta.metadataColumns.toAttributes, 
hasMeta.canRenameConflictingMetadataColumns)
-    case _ =>
-      Nil
-  }
-
   override def name: String = {
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     (catalog, identifier) match {
@@ -98,11 +90,34 @@ case class DataSourceV2Relation(
       }
     }
   }
+}
+
+/**
+ * A specialization of [[DataSourceV2RelationBase]] that supports batch scan.
+ */
+case class DataSourceV2Relation(
+    table: Table,
+    override val output: Seq[AttributeReference],
+    catalog: Option[CatalogPlugin],
+    identifier: Option[Identifier],
+    options: CaseInsensitiveStringMap)
+  extends DataSourceV2RelationBase(table, output, catalog, identifier, options)
+  with ExposesMetadataColumns {
+
+  import DataSourceV2Implicits._
 
   override def newInstance(): DataSourceV2Relation = {
     copy(output = output.map(_.newInstance()))
   }
 
+  override lazy val metadataOutput: Seq[AttributeReference] = table match {
+    case hasMeta: SupportsMetadataColumns =>
+      metadataOutputWithOutConflicts(
+        hasMeta.metadataColumns.toAttributes, 
hasMeta.canRenameConflictingMetadataColumns)
+    case _ =>
+      Nil
+  }
+
   def withMetadataColumns(): DataSourceV2Relation = {
     val newMetadata = metadataOutput.filterNot(outputSet.contains)
     if (newMetadata.nonEmpty) {
@@ -152,21 +167,45 @@ case class DataSourceV2ScanRelation(
 }
 
 /**
- * A specialization of [[DataSourceV2Relation]] with the streaming bit set to 
true.
- *
- * Note that, this plan has a mutable reader, so Spark won't apply operator 
push-down for this plan,
- * to avoid making the plan mutable. We should consolidate this plan and 
[[DataSourceV2Relation]]
- * after we figure out how to apply operator push-down for streaming data 
sources.
+ * A specialization of [[DataSourceV2RelationBase]] that supports streaming 
scan.
+ * It will be transformed to [[StreamingDataSourceV2ScanRelation]] during the 
planning phase of
+ * [[MicrobatchExecution]].
  */
 case class StreamingDataSourceV2Relation(
-    output: Seq[Attribute],
-    scan: Scan,
-    stream: SparkDataStream,
+    table: Table,
+    override val output: Seq[AttributeReference],
     catalog: Option[CatalogPlugin],
     identifier: Option[Identifier],
+    options: CaseInsensitiveStringMap,
+    metadataPath: String)
+  extends DataSourceV2RelationBase(table, output, catalog, identifier, 
options) {
+
+  override def isStreaming: Boolean = true
+
+  override def newInstance(): StreamingDataSourceV2Relation = {
+    copy(output = output.map(_.newInstance()))
+  }
+}
+/**
+ * A specialization of [[DataSourceV2ScanRelation]] with the streaming bit set 
to true, as well
+ * as start and end offsets for Microbatch processing.
+ */
+case class StreamingDataSourceV2ScanRelation(
+    relation: StreamingDataSourceV2Relation,
+    scan: Scan,
+    output: Seq[AttributeReference],
+    stream: SparkDataStream,
     startOffset: Option[Offset] = None,
     endOffset: Option[Offset] = None)
-  extends LeafNode with MultiInstanceRelation {
+  extends LeafNode with MultiInstanceRelation with NamedRelation  {
+
+  val (catalog, identifier) = (relation.catalog, relation.identifier)
+
+  override def name: String = relation.table.name()
+
+  override def simpleString(maxFields: Int): String = {
+    s"StreamingDataSourceV2ScanRelation${truncatedString(output, "[", ", ", 
"]", maxFields)} $name"
+  }
 
   override def isStreaming: Boolean = true
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index fe3140c8030a..3cf311017e5e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -147,7 +147,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
         StoragePartitionJoinParams(relation.keyGroupedPartitioning))
       withProjectAndFilter(project, postScanFilters, batchExec, 
!batchExec.supportsColumnar) :: Nil
 
-    case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)
+    case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
       if r.startOffset.isDefined && r.endOffset.isDefined =>
 
       val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
@@ -157,7 +157,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       // Add a Project here to make sure we produce unsafe rows.
       withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil
 
-    case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)
+    case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
       if r.startOffset.isDefined && r.endOffset.isEmpty =>
 
       val continuousStream = r.stream.asInstanceOf[ContinuousStream]
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 1bd59e818be5..8c98ad5c47dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -32,7 +32,7 @@ import 
org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset =
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
StreamingDataSourceV2Relation, StreamWriterCommitProgress, 
WriteToDataSourceV2Exec}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, 
StreamWriterCommitProgress, WriteToDataSourceV2Exec}
 import 
org.apache.spark.sql.execution.streaming.sources.{WriteToMicroBatchDataSource, 
WriteToMicroBatchDataSourceV1}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.Trigger
@@ -103,7 +103,7 @@ class MicroBatchExecution(
     var nextSourceId = 0L
     val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
     val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, 
StreamingExecutionRelation]()
-    val v2ToRelationMap = MutableMap[StreamingRelationV2, 
StreamingDataSourceV2Relation]()
+    val v2ToRelationMap = MutableMap[StreamingRelationV2, 
StreamingDataSourceV2ScanRelation]()
     // We transform each distinct streaming relation into a 
StreamingExecutionRelation, keeping a
     // map as we go to ensure each identical relation gets the same 
StreamingExecutionRelation
     // object. For each microbatch, the StreamingExecutionRelation will be 
replaced with a logical
@@ -140,7 +140,9 @@ class MicroBatchExecution(
             // TODO: operator pushdown.
             val scan = table.newScanBuilder(options).build()
             val stream = scan.toMicroBatchStream(metadataPath)
-            StreamingDataSourceV2Relation(output, scan, stream, catalog, 
identifier)
+            val relation = StreamingDataSourceV2Relation(
+              table, output, catalog, identifier, options, metadataPath)
+            StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
           })
         } else if (v1.isEmpty) {
           throw QueryExecutionErrors.microBatchUnsupportedByDataSourceError(
@@ -163,7 +165,7 @@ class MicroBatchExecution(
       // v1 source
       case s: StreamingExecutionRelation => s.source
       // v2 source
-      case r: StreamingDataSourceV2Relation => r.stream
+      case r: StreamingDataSourceV2ScanRelation => r.stream
     }
 
     // Initializing TriggerExecutor relies on `sources`, hence calling this 
after initializing
@@ -706,7 +708,7 @@ class MicroBatchExecution(
         }
 
       // For v2 sources.
-      case r: StreamingDataSourceV2Relation =>
+      case r: StreamingDataSourceV2ScanRelation =>
         mutableNewData.get(r.stream).map {
           case OffsetHolder(start, end) =>
             r.copy(startOffset = Some(start), endOffset = Some(end))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index c01f156e3d70..ccbbf9a4d874 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, 
ReportsSinkMetrics, ReportsSourceMetrics, SparkDataStream}
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, 
StreamingDataSourceV2Relation, StreamWriterCommitProgress}
+import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, 
StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress}
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, 
QueryProgressEvent}
 import org.apache.spark.util.Clock
@@ -329,7 +329,7 @@ trait ProgressReporter extends Logging {
     val onlyDataSourceV2Sources = {
       // Check whether the streaming query's logical plan has only V2 
micro-batch data sources
       val allStreamingLeaves = logicalPlan.collect {
-        case s: StreamingDataSourceV2Relation => 
s.stream.isInstanceOf[MicroBatchStream]
+        case s: StreamingDataSourceV2ScanRelation => 
s.stream.isInstanceOf[MicroBatchStream]
         case _: StreamingExecutionRelation => false
       }
       allStreamingLeaves.forall(_ == true)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index a2d9a6705f97..1de05931faf5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -36,7 +36,7 @@ import 
org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Partitio
 import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, 
Write}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.SQLExecution
-import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import 
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, 
StreamingDataSourceV2ScanRelation}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.util.ArrayImplicits._
@@ -61,7 +61,7 @@ class ContinuousExecution(
   private val failure: AtomicReference[Throwable] = new 
AtomicReference[Throwable](null)
 
   override val logicalPlan: WriteToContinuousDataSource = {
-    val v2ToRelationMap = MutableMap[StreamingRelationV2, 
StreamingDataSourceV2Relation]()
+    val v2ToRelationMap = MutableMap[StreamingRelationV2, 
StreamingDataSourceV2ScanRelation]()
     var nextSourceId = 0
     import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
     val _logicalPlan = analyzedPlan.transform {
@@ -79,12 +79,14 @@ class ContinuousExecution(
           // TODO: operator pushdown.
           val scan = table.newScanBuilder(options).build()
           val stream = scan.toContinuousStream(metadataPath)
-          StreamingDataSourceV2Relation(output, scan, stream, catalog, 
identifier)
+          val relation = StreamingDataSourceV2Relation(
+              table, output, catalog, identifier, options, metadataPath)
+          StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
         })
     }
 
     sources = _logicalPlan.collect {
-      case r: StreamingDataSourceV2Relation => 
r.stream.asInstanceOf[ContinuousStream]
+      case r: StreamingDataSourceV2ScanRelation => 
r.stream.asInstanceOf[ContinuousStream]
     }
     uniqueSources = sources.distinct.map(s => s -> 
ReadLimit.allAvailable()).toMap
 
@@ -197,7 +199,7 @@ class ContinuousExecution(
     }
 
     val withNewSources: LogicalPlan = logicalPlan transform {
-      case relation: StreamingDataSourceV2Relation =>
+      case relation: StreamingDataSourceV2ScanRelation =>
         val loggedOffset = offsets.offsets(0)
         val realOffset = loggedOffset.map(off => 
relation.stream.deserializeOffset(off.json))
         val startOffset = realOffset.getOrElse(relation.stream.initialOffset)
@@ -227,7 +229,7 @@ class ContinuousExecution(
     }
 
     val stream = withNewSources.collect {
-      case relation: StreamingDataSourceV2Relation =>
+      case relation: StreamingDataSourceV2ScanRelation =>
         relation.stream.asInstanceOf[ContinuousStream]
     }.head
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index 1f5f3d76d104..69dc8c291c0b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
 import org.apache.spark.sql.execution.datasources.DataSource
-import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.functions._
@@ -43,7 +43,7 @@ class RateStreamProviderSuite extends StreamTest {
     override def addData(query: Option[StreamExecution]): (SparkDataStream, 
Offset) = {
       assert(query.nonEmpty)
       val rateSource = query.get.logicalPlan.collect {
-        case r: StreamingDataSourceV2Relation
+        case r: StreamingDataSourceV2ScanRelation
             if r.stream.isInstanceOf[RateStreamMicroBatchStream] =>
           r.stream.asInstanceOf[RateStreamMicroBatchStream]
       }.head
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index 15aa0a80c207..bfeca5851102 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
 import org.apache.spark.sql.execution.datasources.DataSource
-import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
@@ -59,7 +59,7 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSparkSession {
         "Cannot add data when there is no query for finding the active socket 
source")
 
       val sources = query.get.logicalPlan.collect {
-        case r: StreamingDataSourceV2Relation
+        case r: StreamingDataSourceV2ScanRelation
             if r.stream.isInstanceOf[TextSocketMicroBatchStream] =>
           r.stream.asInstanceOf[TextSocketMicroBatchStream]
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b0e54737d104..1b0b53357e5b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -511,7 +511,7 @@ class StreamSuite extends StreamTest {
 
       val explainWithoutExtended = q.explainInternal(false)
       // `extended = false` only displays the physical plan.
-      assert("StreamingDataSourceV2Relation".r
+      assert("StreamingDataSourceV2ScanRelation".r
         .findAllMatchIn(explainWithoutExtended).size === 0)
       assert("BatchScan".r
         .findAllMatchIn(explainWithoutExtended).size === 1)
@@ -521,7 +521,7 @@ class StreamSuite extends StreamTest {
       val explainWithExtended = q.explainInternal(true)
       // `extended = true` displays 3 logical plans 
(Parsed/Optimized/Optimized) and 1 physical
       // plan.
-      assert("StreamingDataSourceV2Relation".r
+      assert("StreamingDataSourceV2ScanRelation".r
         .findAllMatchIn(explainWithExtended).size === 3)
       assert("BatchScan".r
         .findAllMatchIn(explainWithExtended).size === 1)
@@ -566,7 +566,7 @@ class StreamSuite extends StreamTest {
       val explainWithoutExtended = q.explainInternal(false)
 
       // `extended = false` only displays the physical plan.
-      assert("StreamingDataSourceV2Relation".r
+      assert("StreamingDataSourceV2ScanRelation".r
         .findAllMatchIn(explainWithoutExtended).size === 0)
       assert("ContinuousScan".r
         .findAllMatchIn(explainWithoutExtended).size === 1)
@@ -574,7 +574,7 @@ class StreamSuite extends StreamTest {
       val explainWithExtended = q.explainInternal(true)
       // `extended = true` displays 3 logical plans 
(Parsed/Optimized/Optimized) and 1 physical
       // plan.
-      assert("StreamingDataSourceV2Relation".r
+      assert("StreamingDataSourceV2ScanRelation".r
         .findAllMatchIn(explainWithExtended).size === 3)
       assert("ContinuousScan".r
         .findAllMatchIn(explainWithExtended).size === 1)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7ee66d18d481..f4816b04bbb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.physical.AllTuples
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
SparkDataStream}
-import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
 import org.apache.spark.sql.execution.streaming._
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, 
EpochCoordinatorRef, IncrementAndGetEpoch}
 import org.apache.spark.sql.execution.streaming.sources.MemorySink
@@ -702,7 +702,7 @@ trait StreamTest extends QueryTest with SharedSparkSession 
with TimeLimits with
                   // v1 source
                   case r: StreamingExecutionRelation => r.source
                   // v2 source
-                  case r: StreamingDataSourceV2Relation => r.stream
+                  case r: StreamingDataSourceV2ScanRelation => r.stream
                   // We can add data to memory stream before starting it. Then 
the input plan has
                   // not been processed by the streaming engine and contains 
`StreamingRelationV2`.
                   case r: StreamingRelationV2 if r.sourceName == "memory" =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 754f55202254..53cbbe6e786f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{Dataset, Encoders}
-import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.util.BlockingSource
@@ -467,7 +467,7 @@ class StreamingQueryManagerSuite extends StreamTest {
       if (withError) {
         logDebug(s"Terminating query ${queryToStop.name} with error")
         
queryToStop.asInstanceOf[StreamingQueryWrapper].streamingQuery.logicalPlan.collect
 {
-          case r: StreamingDataSourceV2Relation =>
+          case r: StreamingDataSourceV2ScanRelation =>
             r.stream.asInstanceOf[MemoryStream[Int]].addData(0)
         }
       } else {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 5a4f386f1d1d..eecc9468649d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -433,7 +433,7 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
           val explainWithExtended = sq.explainInternal(true)
           // `extended = true` displays 3 logical plans 
(Parsed/Analyzed/Optimized) and 1 physical
           // plan.
-          assert("StreamingDataSourceV2Relation".r
+          assert("StreamingDataSourceV2ScanRelation".r
             .findAllMatchIn(explainWithExtended).size === 3)
           // WriteToMicroBatchDataSource is used for both parsed and analyzed 
logical plan
           assert("WriteToMicroBatchDataSource".r


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to