This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5150-bf2f92c81bed8520d6421bdb4b820ca61667ca2a in repository https://gitbox.apache.org/repos/asf/texera.git
commit a7d4bc622f639aa3445d2c199ae4bbded28b8d1c Author: Matthew B. <[email protected]> AuthorDate: Fri May 22 23:05:14 2026 -0700 fix: drop withDefaultValue from StatisticsManager so checkpoint state round-trips (#5150) ### What changes were proposed in this PR? `StatisticsManager` declared its input/output stats maps as `mutable.Map.empty.withDefaultValue((0L, 0L))`. The resulting `Map.WithDefault` wrapper does not survive a Kryo round-trip (its inner map deserializes as null), so `chkpt.load(CP_STATE_KEY)` on a default-state `ControllerProcessor` throws `KryoException: NullPointerException`, blocking `Controller.loadFromCheckpoint` from ever rehydrating a checkpointed controller. This PR removes the wrapper and inlines `getOrElse(portId, (0L, 0L))` at the two write sites; behavior is unchanged. ### Any related issues, documentation, or discussions? closes: #4686 ### How was this PR tested? Replaced the two existing `should be serializable` cases in `CheckpointSpec` with full save then load round-trips (controller + worker) that assert `restored.actorId == original.actorId`; the new tests reproduce the original NPE on `main` and pass after the fix. Verified locally with `sbt 'WorkflowExecutionService / Test / testOnly org.apache.texera.amber.engine.faulttolerance.CheckpointSpec'` (3/3 pass). ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.7 in compliance with ASF --------- Signed-off-by: Matthew B. <[email protected]> --- .../worker/managers/StatisticsManager.scala | 17 +++++++++++------ .../worker/managers/WorkerManagersSpec.scala | 14 +++++++++----- .../amber/engine/faulttolerance/CheckpointSpec.scala | 8 ++++++-- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala index 8ae0419f0a..ab46e17654 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala @@ -31,10 +31,11 @@ import scala.collection.mutable class StatisticsManager { // DataProcessor + // Plain maps (no withDefaultValue) so they survive Kryo round-trip. private val inputStatistics: mutable.Map[PortIdentity, (Long, Long)] = - mutable.Map.empty.withDefaultValue((0L, 0L)) + mutable.Map.empty private val outputStatistics: mutable.Map[PortIdentity, (Long, Long)] = - mutable.Map.empty.withDefaultValue((0L, 0L)) + mutable.Map.empty private var dataProcessingTime: Long = 0L private var totalExecutionTime: Long = 0L private var workerStartTime: Long = 0L @@ -82,8 +83,10 @@ class StatisticsManager { */ def increaseInputStatistics(portId: PortIdentity, size: Long): Unit = { require(size >= 0, "Tuple size must be non-negative") - val (count, totalSize) = inputStatistics(portId) - inputStatistics.update(portId, (count + 1, totalSize + size)) + inputStatistics.updateWith(portId) { + case Some((count, totalSize)) => Some((count + 1, totalSize + size)) + case None => Some((1L, size)) + } } /** @@ -93,8 +96,10 @@ class StatisticsManager { */ def increaseOutputStatistics(portId: PortIdentity, size: Long): Unit = { require(size >= 0, "Tuple size must be non-negative") - val (count, totalSize) = outputStatistics(portId) - outputStatistics.update(portId, (count + 1, totalSize + size)) + outputStatistics.updateWith(portId) { + case Some((count, totalSize)) => Some((count + 1, totalSize + size)) + case None => Some((1L, size)) + } } /** diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala index 3fbff39148..1932823f5d 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala @@ -76,11 +76,15 @@ class WorkerManagersSpec extends AnyFlatSpec { val sm = new StatisticsManager() sm.increaseOutputStatistics(PortIdentity(0), 30) sm.increaseOutputStatistics(PortIdentity(0), 70) - assert(sm.getOutputTupleCount == 2L) - val out = sm.getStatistics(nullExec).outputTupleMetrics - assert(out.size == 1) - assert(out.head.tupleMetrics.count == 2L) - assert(out.head.tupleMetrics.size == 100L) + sm.increaseOutputStatistics(PortIdentity(1), 25) + assert(sm.getOutputTupleCount == 3L) + val byPort = sm + .getStatistics(nullExec) + .outputTupleMetrics + .map(m => m.portId -> (m.tupleMetrics.count, m.tupleMetrics.size)) + .toMap + assert(byPort(PortIdentity(0)) == (2L, 100L)) + assert(byPort(PortIdentity(1)) == (1L, 25L)) } it should "reject negative tuple sizes" in { diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala index fbc7e8044d..3d207fd23b 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala @@ -63,7 +63,7 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { system.actorOf(Props[SingleNodeListener](), "cluster-info") } - "Default controller state" should "be serializable" in { + "Default controller state" should "round-trip through CheckpointState" in { val cp = new ControllerProcessor( workflow.context, @@ -73,9 +73,11 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { ) val chkpt = new CheckpointState() chkpt.save(CP_STATE_KEY, cp) + val restored: ControllerProcessor = chkpt.load(CP_STATE_KEY) + assert(restored.actorId == cp.actorId) } - "Default worker state" should "be serializable" in { + "Default worker state" should "round-trip through CheckpointState" in { val dp = new DataProcessor( SELF, msg => {}, @@ -83,6 +85,8 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { ) val chkpt = new CheckpointState() chkpt.save(DP_STATE_KEY, dp) + val restored: DataProcessor = chkpt.load(DP_STATE_KEY) + assert(restored.actorId == dp.actorId) } "CheckpointState" should "fail loudly on an unknown key" in {
