This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new a7d4bc622f fix: drop withDefaultValue from StatisticsManager so
checkpoint state round-trips (#5150)
a7d4bc622f is described below
commit a7d4bc622f639aa3445d2c199ae4bbded28b8d1c
Author: Matthew B. <[email protected]>
AuthorDate: Fri May 22 23:05:14 2026 -0700
fix: drop withDefaultValue from StatisticsManager so checkpoint state
round-trips (#5150)
### What changes were proposed in this PR?
`StatisticsManager` declared its input/output stats maps as
`mutable.Map.empty.withDefaultValue((0L, 0L))`. The resulting
`Map.WithDefault` wrapper does not survive a Kryo round-trip (its inner
map deserializes as null), so
`chkpt.load(CP_STATE_KEY)` on a default-state `ControllerProcessor`
throws `KryoException: NullPointerException`, blocking
`Controller.loadFromCheckpoint` from ever rehydrating a checkpointed
controller. This PR removes the wrapper and inlines `getOrElse(portId,
(0L, 0L))` at the two write sites; behavior is unchanged.
### Any related issues, documentation, or discussions?
closes: #4686
### How was this PR tested?
Replaced the two existing `should be serializable` cases in
`CheckpointSpec` with full save then load round-trips (controller +
worker) that assert `restored.actorId == original.actorId`; the new
tests reproduce the original NPE on `main` and pass after the fix.
Verified locally with `sbt 'WorkflowExecutionService / Test / testOnly
org.apache.texera.amber.engine.faulttolerance.CheckpointSpec'` (3/3
pass).
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7 in compliance with ASF
---------
Signed-off-by: Matthew B. <[email protected]>
---
.../worker/managers/StatisticsManager.scala | 17 +++++++++++------
.../worker/managers/WorkerManagersSpec.scala | 14 +++++++++-----
.../amber/engine/faulttolerance/CheckpointSpec.scala | 8 ++++++--
3 files changed, 26 insertions(+), 13 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala
index 8ae0419f0a..ab46e17654 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala
@@ -31,10 +31,11 @@ import scala.collection.mutable
class StatisticsManager {
// DataProcessor
+ // Plain maps (no withDefaultValue) so they survive Kryo round-trip.
private val inputStatistics: mutable.Map[PortIdentity, (Long, Long)] =
- mutable.Map.empty.withDefaultValue((0L, 0L))
+ mutable.Map.empty
private val outputStatistics: mutable.Map[PortIdentity, (Long, Long)] =
- mutable.Map.empty.withDefaultValue((0L, 0L))
+ mutable.Map.empty
private var dataProcessingTime: Long = 0L
private var totalExecutionTime: Long = 0L
private var workerStartTime: Long = 0L
@@ -82,8 +83,10 @@ class StatisticsManager {
*/
def increaseInputStatistics(portId: PortIdentity, size: Long): Unit = {
require(size >= 0, "Tuple size must be non-negative")
- val (count, totalSize) = inputStatistics(portId)
- inputStatistics.update(portId, (count + 1, totalSize + size))
+ inputStatistics.updateWith(portId) {
+ case Some((count, totalSize)) => Some((count + 1, totalSize + size))
+ case None => Some((1L, size))
+ }
}
/**
@@ -93,8 +96,10 @@ class StatisticsManager {
*/
def increaseOutputStatistics(portId: PortIdentity, size: Long): Unit = {
require(size >= 0, "Tuple size must be non-negative")
- val (count, totalSize) = outputStatistics(portId)
- outputStatistics.update(portId, (count + 1, totalSize + size))
+ outputStatistics.updateWith(portId) {
+ case Some((count, totalSize)) => Some((count + 1, totalSize + size))
+ case None => Some((1L, size))
+ }
}
/**
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
index 3fbff39148..1932823f5d 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
@@ -76,11 +76,15 @@ class WorkerManagersSpec extends AnyFlatSpec {
val sm = new StatisticsManager()
sm.increaseOutputStatistics(PortIdentity(0), 30)
sm.increaseOutputStatistics(PortIdentity(0), 70)
- assert(sm.getOutputTupleCount == 2L)
- val out = sm.getStatistics(nullExec).outputTupleMetrics
- assert(out.size == 1)
- assert(out.head.tupleMetrics.count == 2L)
- assert(out.head.tupleMetrics.size == 100L)
+ sm.increaseOutputStatistics(PortIdentity(1), 25)
+ assert(sm.getOutputTupleCount == 3L)
+ val byPort = sm
+ .getStatistics(nullExec)
+ .outputTupleMetrics
+ .map(m => m.portId -> (m.tupleMetrics.count, m.tupleMetrics.size))
+ .toMap
+ assert(byPort(PortIdentity(0)) == (2L, 100L))
+ assert(byPort(PortIdentity(1)) == (1L, 25L))
}
it should "reject negative tuple sizes" in {
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
index fbc7e8044d..3d207fd23b 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
@@ -63,7 +63,7 @@ class CheckpointSpec extends AnyFlatSpecLike with
BeforeAndAfterAll {
system.actorOf(Props[SingleNodeListener](), "cluster-info")
}
- "Default controller state" should "be serializable" in {
+ "Default controller state" should "round-trip through CheckpointState" in {
val cp =
new ControllerProcessor(
workflow.context,
@@ -73,9 +73,11 @@ class CheckpointSpec extends AnyFlatSpecLike with
BeforeAndAfterAll {
)
val chkpt = new CheckpointState()
chkpt.save(CP_STATE_KEY, cp)
+ val restored: ControllerProcessor = chkpt.load(CP_STATE_KEY)
+ assert(restored.actorId == cp.actorId)
}
- "Default worker state" should "be serializable" in {
+ "Default worker state" should "round-trip through CheckpointState" in {
val dp = new DataProcessor(
SELF,
msg => {},
@@ -83,6 +85,8 @@ class CheckpointSpec extends AnyFlatSpecLike with
BeforeAndAfterAll {
)
val chkpt = new CheckpointState()
chkpt.save(DP_STATE_KEY, dp)
+ val restored: DataProcessor = chkpt.load(DP_STATE_KEY)
+ assert(restored.actorId == dp.actorId)
}
"CheckpointState" should "fail loudly on an unknown key" in {