lnagel opened a new issue, #55236:
URL: https://github.com/apache/spark/issues/55236
Project: SPARK
Type: Bug
Component: Structured Streaming, Kafka
Affects Version: 4.1.1
Priority: Critical
KafkaMicroBatchStream.metrics() throws a NullPointerException when called
before latestOffset() has populated the latestPartitionOffsets field.
This is a regression introduced in SPARK-54027 (PR #52729), which refactored
the metrics method to wrap latestPartitionOffsets in Option. The original
implementation (from SPARK-34854) passed the value directly and used an
explicit null check, which was safe:
```
// Before SPARK-54027 (safe)
if (offset.nonEmpty && latestAvailablePartitionOffsets != null) {
```
After SPARK-54027, the instance method wraps the value in Some():
```scala
// KafkaMicroBatchStream.scala, instance metrics method
} else {
Some(latestPartitionOffsets) // creates Some(null) when field is
uninitialized
}
```
And the companion object method uses .isDefined:
```scala
// KafkaMicroBatchStream.scala, companion object metrics method
if (offset.nonEmpty && latestAvailablePartitionOffsets.isDefined) {
...
val offsetsBehindLatest = latestAvailablePartitionOffsets.get // returns
null
.map(...) // NullPointerException
```
Since latestPartitionOffsets is declared as `private var
latestPartitionOffsets: PartitionOffsetMap = _` (null), wrapping it in Some()
before latestOffset() runs creates Some(null). The .isDefined check passes
(Some(null).isDefined == true), then .get returns null, and calling .map() on
null throws the NPE.
This crashes the streaming query during finishTrigger ->
extractSourceProgress -> metrics, with no way to recover.
Stack trace:
```
Caused by: java.lang.NullPointerException: Cannot invoke
"scala.collection.IterableOps.map(scala.Function1)" because the return value of
"scala.Option.get()" is null
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
at
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$2(ProgressReporter.scala:384)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:496)
at
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$1(ProgressReporter.scala:380)
at scala.collection.immutable.List.map(List.scala:236)
at scala.collection.immutable.List.map(List.scala:79)
at
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.extractSourceProgress(ProgressReporter.scala:379)
at
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.constructNewProgress(ProgressReporter.scala:348)
at
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:312)
at
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:429)
at
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:525)
```
Fix: change `Some(latestPartitionOffsets)` to
`Option(latestPartitionOffsets)` in the instance metrics method, which converts
null to None instead of Some(null).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]