This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bcc29db25406 [SPARK-55601][SS][FOLLOWUP] Cache offsetLog.getLatest() 
to avoid redundant ListStatus during MicroBatchExecution init
bcc29db25406 is described below

commit bcc29db2540682e42b94fb0b393cee5a45d1d8f0
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue May 26 10:37:33 2026 +0800

    [SPARK-55601][SS][FOLLOWUP] Cache offsetLog.getLatest() to avoid redundant 
ListStatus during MicroBatchExecution init
    
    ### What changes were proposed in this pull request?
    
    Followup to https://github.com/apache/spark/pull/54373.
    
    SPARK-55601 added a new `offsetLog.getLatest()` call inside `logicalPlan`'s 
computation to derive `enforceNamed` from the last written offset log entry. 
`initializeExecution` already calls `offsetLog.getLatest()` on its first line. 
Both calls happen on the query thread during stream startup, with no offset log 
writes in between, so the two reads always return the same value. The second 
one is wasted work: each `getLatest()` triggers `listBatches` → 
`HDFSMetadataLog.list` → a filesyste [...]
    
    This PR caches the first read in a `private lazy val 
initialLatestOffsetSeq` on `MicroBatchExecution` and routes both call sites 
through it:
    
    - `enforceNamed` derivation in `logicalPlan` lazy val.
    - `var latestStartedBatch` initialization in `initializeExecution`.
    
    Subsequent reads inside `initializeExecution` (after a `purgeAfter`) and in 
`populateStartOffsets` are unchanged — those legitimately need fresh 
`getLatest()` results.
    
    ### Why are the changes needed?
    
    Avoids one redundant `ListStatus` on `<checkpoint>/offsets/` per stream 
startup. The cost is small but unnecessary, and downstream consumers that track 
per-checkpoint filesystem operations (for tracing, auditing, or test 
invariants) currently see one extra op against the offsets directory because of 
this duplication.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Same behavior, fewer filesystem calls.
    
    ### How was this patch tested?
    
    Existing `MicroBatchExecutionSuite` and downstream streaming-startup tests 
cover both call sites. The change is a pure caching refactor; the cached value 
is identical to what a second `getLatest()` would return because nothing else 
writes the offset log between construction and `initializeExecution` on the 
query thread.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code
    
    Closes #56054 from cloud-fan/SPARK-55601-followup.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/streaming/runtime/MicroBatchExecution.scala  | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index b499e676a84a..726586ac72e6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -183,6 +183,16 @@ class MicroBatchExecution(
   // into every subsequent batch's query plan.
   private val stateSchemaMetadatas = MutableMap[Long, StateSchemaBroadcast]()
 
+  /**
+   * Cached result of the first `offsetLog.getLatest()` call. Reused by both
+   * `logicalPlan` (to determine `enforceNamed`) and `initializeExecution` (to 
seed
+   * `latestStartedBatch`). This avoids a redundant `ListStatus` on the 
checkpoint's
+   * `offsets/` directory during stream startup. Safe to cache: between 
construction
+   * and `initializeExecution`, nothing else writes the offset log on the 
query thread.
+   */
+  private lazy val initialLatestOffsetSeq: Option[(Long, OffsetSeqBase)] =
+    offsetLog.getLatest()
+
   override lazy val logicalPlan: LogicalPlan = {
     assert(queryExecutionThread eq Thread.currentThread,
       "logicalPlan must be initialized in QueryExecutionThread " +
@@ -204,7 +214,7 @@ class MicroBatchExecution(
 
     // Read the source evolution enforcement from the last written offset log 
entry. If no entries
     // are found, use the session config value.
-    val enforceNamed = offsetLog.getLatest().flatMap { case (_, offsetSeq) =>
+    val enforceNamed = initialLatestOffsetSeq.flatMap { case (_, offsetSeq) =>
       offsetSeq.metadataOpt.flatMap { metadata =>
         OffsetSeqMetadata.readValueOpt(metadata, 
SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION)
           .map(_.toBoolean)
@@ -451,7 +461,7 @@ class MicroBatchExecution(
 
   private def initializeExecution(
       sparkSessionForStream: SparkSession): MicroBatchExecutionContext = {
-    var latestStartedBatch = offsetLog.getLatest()
+    var latestStartedBatch = initialLatestOffsetSeq
     val latestCommittedBatch = commitLog.getLatest()
 
     val lastCommittedBatchId = latestCommittedBatch match {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to