lnagel opened a new issue, #55236:
URL: https://github.com/apache/spark/issues/55236

   Project: SPARK
   Type: Bug
   Component: Structured Streaming, Kafka
   Affects Version: 4.1.1
   Priority: Critical
   
   KafkaMicroBatchStream.metrics() throws a NullPointerException when called 
before latestOffset() has populated the latestPartitionOffsets field.
   
   This is a regression introduced in SPARK-54027 (PR #52729), which refactored 
the metrics method to wrap latestPartitionOffsets in Option. The original 
implementation (from SPARK-34854) passed the value directly and used an 
explicit null check, which was safe:
   
   ```
   // Before SPARK-54027 (safe)
   if (offset.nonEmpty && latestAvailablePartitionOffsets != null) {
   ```
   
   After SPARK-54027, the instance method wraps the value in Some():
   
   ```scala
   // KafkaMicroBatchStream.scala, instance metrics method
   } else {
     Some(latestPartitionOffsets)  // creates Some(null) when field is 
uninitialized
   }
   ```
   
   And the companion object method uses .isDefined:
   
   ```scala
   // KafkaMicroBatchStream.scala, companion object metrics method
   if (offset.nonEmpty && latestAvailablePartitionOffsets.isDefined) {
     ...
     val offsetsBehindLatest = latestAvailablePartitionOffsets.get  // returns 
null
       .map(...)  // NullPointerException
   ```
   
   Since latestPartitionOffsets is declared as `private var 
latestPartitionOffsets: PartitionOffsetMap = _` (null), wrapping it in Some() 
before latestOffset() runs creates Some(null). The .isDefined check passes 
(Some(null).isDefined == true), then .get returns null, and calling .map() on 
null throws the NPE.
   
   This crashes the streaming query during finishTrigger -> 
extractSourceProgress -> metrics, with no way to recover.
   
   Stack trace:
   
   ```
   Caused by: java.lang.NullPointerException: Cannot invoke 
"scala.collection.IterableOps.map(scala.Function1)" because the return value of 
"scala.Option.get()" is null
           at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
           at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
           at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$2(ProgressReporter.scala:384)
           at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:496)
           at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$1(ProgressReporter.scala:380)
           at scala.collection.immutable.List.map(List.scala:236)
           at scala.collection.immutable.List.map(List.scala:79)
           at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.extractSourceProgress(ProgressReporter.scala:379)
           at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.constructNewProgress(ProgressReporter.scala:348)
           at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:312)
           at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:429)
           at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:525)
   ```
   
   Fix: change `Some(latestPartitionOffsets)` to 
`Option(latestPartitionOffsets)` in the instance metrics method, which converts 
null to None instead of Some(null).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to