This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e6e15ca633 [SPARK-45080][SS] Explicitly call out support for columnar 
in DSv2 streaming data sources
0e6e15ca633 is described below

commit 0e6e15ca6331d37a6c38c970556903c6df5d5dfb
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Thu Sep 7 19:42:58 2023 +0900

    [SPARK-45080][SS] Explicitly call out support for columnar in DSv2 
streaming data sources
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to override `Scan.columnarSupportMode` for DSv2 streaming 
data sources. All of them don't support columnar. This applies 
[SPARK-44505](https://issues.apache.org/jira/browse/SPARK-44505) to the DSv2 
streaming data sources.
    
    Rationalization will be explained in the next section.
    
    ### Why are the changes needed?
    
    The default value for `Scan.columnarSupportMode` is `PARTITION_DEFINED`, 
which requires `inputPartitions` to be called/evaluated. That could be 
referenced multiple times during planning.
    
    In `MicrobatchScanExec`, we define `inputPartitions` as lazy val, so that 
there is no multiple evaluation of inputPartitions, which calls 
`MicroBatchStream.planInputPartitions`. But we missed that there is no 
guarantee that the instance will be initialized only once (although the actual 
execution will happen once) - for example, executedPlan clones the plan 
(internally we call constructor to make a deep copy of the node), explain 
(internally called to build a SQL execution start event [...]
    
    I see `MicroBatchStream.planInputPartitions` gets called 4 times per 
microbatch, which can be concerning if the overhead of planInputPartitions is 
non-trivial, specifically Kafka.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42823 from HeartSaVioR/SPARK-45080.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala     | 3 +++
 .../main/scala/org/apache/spark/sql/execution/streaming/memory.scala  | 3 +++
 .../sql/execution/streaming/sources/RatePerMicroBatchProvider.scala   | 3 +++
 .../spark/sql/execution/streaming/sources/RateStreamProvider.scala    | 3 +++
 .../sql/execution/streaming/sources/TextSocketSourceProvider.scala    | 3 +++
 .../scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala    | 4 ++--
 6 files changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index de78992533b..d9e3a1256ea 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -525,6 +525,9 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     override def supportedCustomMetrics(): Array[CustomMetric] = {
       Array(new OffsetOutOfRangeMetric, new DataLossMetric)
     }
+
+    override def columnarSupportMode(): Scan.ColumnarSupportMode =
+      Scan.ColumnarSupportMode.UNSUPPORTED
   }
 }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 34076f26fe8..732eaa8d783 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -139,6 +139,9 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) 
extends ScanBuilder w
   override def toContinuousStream(checkpointLocation: String): 
ContinuousStream = {
     stream.asInstanceOf[ContinuousStream]
   }
+
+  override def columnarSupportMode(): Scan.ColumnarSupportMode =
+    Scan.ColumnarSupportMode.UNSUPPORTED
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
index 41878a6a549..17cc1860fbd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
@@ -111,6 +111,9 @@ class RatePerMicroBatchTable(
     override def toContinuousStream(checkpointLocation: String): 
ContinuousStream = {
       throw new UnsupportedOperationException("continuous mode is not 
supported!")
     }
+
+    override def columnarSupportMode(): Scan.ColumnarSupportMode =
+      Scan.ColumnarSupportMode.UNSUPPORTED
   }
 }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index bf2cc770d79..24e283f4ad6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -100,6 +100,9 @@ class RateStreamTable(
 
     override def toContinuousStream(checkpointLocation: String): 
ContinuousStream =
       new RateStreamContinuousStream(rowsPerSecond, numPartitions)
+
+    override def columnarSupportMode(): Scan.ColumnarSupportMode =
+      Scan.ColumnarSupportMode.UNSUPPORTED
   }
 }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index 1ab88cd41d8..e4251cc7d39 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -98,6 +98,9 @@ class TextSocketTable(host: String, port: Int, numPartitions: 
Int, includeTimest
     override def toContinuousStream(checkpointLocation: String): 
ContinuousStream = {
       new TextSocketContinuousStream(host, port, numPartitions, options)
     }
+
+    override def columnarSupportMode(): Scan.ColumnarSupportMode =
+      Scan.ColumnarSupportMode.UNSUPPORTED
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 4a6325eb060..c3729d50ed0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -329,10 +329,10 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
         assert(progress.processedRowsPerSecond === 4.0)
 
         assert(progress.durationMs.get("latestOffset") === 50)
-        assert(progress.durationMs.get("queryPlanning") === 100)
+        assert(progress.durationMs.get("queryPlanning") === 0)
         assert(progress.durationMs.get("walCommit") === 0)
         assert(progress.durationMs.get("commitOffsets") === 0)
-        assert(progress.durationMs.get("addBatch") === 350)
+        assert(progress.durationMs.get("addBatch") === 450)
         assert(progress.durationMs.get("triggerExecution") === 500)
 
         assert(progress.sources.length === 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to