This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 2fb66747ab chore(amber): remove dead tuple-level breakpoint test and
exception class (#4504)
2fb66747ab is described below
commit 2fb66747abd97589e8ee9c68bb02f6fcd84417bf
Author: Yicong Huang <[email protected]>
AuthorDate: Sat Apr 25 18:10:33 2026 -0700
chore(amber): remove dead tuple-level breakpoint test and exception class
(#4504)
### What changes were proposed in this PR?
Two related deletions, all leftovers from the old Scala tuple-level
breakpoint mechanism:
1.
**`amber/src/test/scala/.../breakpoint/ExceptionBreakpointSpec.scala`**
— all 7 test cases commented out since #941 (2020-12-21). Subsequent
commits only renamed packages.
2.
**`amber/src/main/scala/.../common/amberexception/BreakpointException.scala`**
— sole class in its package, zero references anywhere in `amber/` or
`common/`.
The old breakpoint system's other classes (`LocalBreakpoint`,
`FaultedTuple`, `SkipTuple` (engine class, not `SkipTupleRequest`),
`ModifyTuple`, `ExceptionBreakpoint`, `ReportGlobalBreakpointTriggered`)
had already been removed from the codebase — `grep` confirmed zero
remaining references.
This is unrelated to the modern Python UDF debugger (Udon), which is
covered by frontend specs.
### Any related issues, documentation, discussions?
Closes #4503
### How was this PR tested?
Pure deletion. The test file had no active test cases, and
`BreakpointException` had no callers (verified via `grep` across
`amber/src` and `common/`). No behavior changes.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
---
.../amberexception/BreakpointException.scala | 24 --
.../breakpoint/ExceptionBreakpointSpec.scala | 298 ---------------------
2 files changed, 322 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/common/amberexception/BreakpointException.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/common/amberexception/BreakpointException.scala
deleted file mode 100644
index 647a5248ec..0000000000
---
a/amber/src/main/scala/org/apache/texera/amber/engine/common/amberexception/BreakpointException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.engine.common.amberexception
-
-import org.apache.texera.amber.core.WorkflowRuntimeException
-
-class BreakpointException extends WorkflowRuntimeException("breakpoint
triggered") {}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/breakpoint/ExceptionBreakpointSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/breakpoint/ExceptionBreakpointSpec.scala
deleted file mode 100644
index 45f0d1bcb8..0000000000
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/breakpoint/ExceptionBreakpointSpec.scala
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.engine.architecture.breakpoint
-
-import org.apache.pekko.actor.ActorSystem
-import org.apache.pekko.event.LoggingAdapter
-import org.apache.pekko.testkit.{ImplicitSender, TestKit}
-import org.apache.pekko.util.Timeout
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.flatspec.AnyFlatSpecLike
-
-import scala.concurrent.ExecutionContextExecutor
-import scala.concurrent.duration._
-
-class ExceptionBreakpointSpec
- extends TestKit(ActorSystem("PrincipalSpec"))
- with ImplicitSender
- with AnyFlatSpecLike
- with BeforeAndAfterAll {
-
- implicit val timeout: Timeout = Timeout(5.seconds)
- implicit val executionContext: ExecutionContextExecutor = system.dispatcher
- implicit val log: LoggingAdapter = system.log
-
- // private val logicalPlan1 =
- // """{
- // |"operators":[
- //
|{"tableName":"D:\\small_input.csv","operatorId":"Scan","operatorType":"LocalScanSource","delimiter":","},
- //
|{"attributeName":0,"keyword":"asia","operatorId":"KeywordSearch1","operatorType":"KeywordMatcher"},
- // |{"operatorId":"Sink","operatorType":"Sink"}],
- // |"links":[
- // |{"origin":"Scan","destination":"KeywordSearch1"},
- // |{"origin":"KeywordSearch1","destination":"Sink"}]
- // |}""".stripMargin
- //
- // private val logicalPlan2 =
- // """{
- // |"operators":[
- //
|{"limit":10000,"delay":0,"operatorId":"Gen","operatorType":"Generate"},
- // |{"operatorId":"Count","operatorType":"Aggregation"},
- // |{"operatorId":"Sink","operatorType":"Sink"}],
- // |"links":[
- // |{"origin":"Gen","destination":"Count"},
- // |{"origin":"Count","destination":"Sink"}]
- // |}""".stripMargin
- //
- // val workflowTag = WorkflowTag("sample")
- // var index = 0
- // val opTag: () => OperatorIdentifier = () => {
- // index += 1; OperatorIdentifier(workflowTag, index.toString)
- // }
- // val layerTag: () => LayerTag = () => { index += 1; LayerTag(opTag(),
index.toString) }
- // val workerTag: () => WorkerTag = () => { index += 1;
WorkerTag(layerTag(), index) }
- // val linkTag: () => LinkTag = () => { LinkTag(layerTag(), layerTag(), 0) }
- //
- // def resultValidation(expectedTupleCount: Int, idleTime: Duration =
2.seconds): Unit = {
- // var counter = 0
- // var receivedEnd = false
- // receiveWhile(5.minutes, idleTime) {
- // case DataMessage(seq, payload) => counter += payload.length
- // case EndSending(seq) => receivedEnd = true
- // case msg =>
- // }
- // assert(counter == expectedTupleCount)
- // assert(receivedEnd)
- // }
- //
- // override def beforeAll: Unit = {
- // system.actorOf(Props[SingleNodeListener], "cluster-info")
- // }
- //
- // override def afterAll: Unit = {
- // TestKit.shutdownActorSystem(system)
- // }
- //
- // "A workflow" should "be able to detect faulted tuples and trigger
exception breakpoint in the workflow1, then skip them" in {
- // val parent = TestProbe()
- // val controller = parent.childActorOf(CONTROLLER.props(logicalPlan1))
- // controller ! AckedControllerInitialization
- // parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
- // controller ! Start
- // parent.expectMsg(ReportState(ControllerState.Running))
- // var isCompleted = false
- // parent.receiveWhile(30.seconds, 10.seconds) {
- // case ReportGlobalBreakpointTriggered(bp, opID) =>
- // for (i <- bp) {
- // log.info(
- // (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + "
ERRORS: [" + i._2
- // .mkString(",") + "]"
- // )
- // AdvancedMessageSending.blockingAskWithRetry(i._1._1,
SkipTuple(i._1._2), 5)
- // }
- // controller ! Resume
- // case ReportState(ControllerState.Paused) =>
- // case ReportState(ControllerState.Completed) =>
- // isCompleted = true
- // case _ =>
- // }
- // assert(isCompleted)
- // parent.ref ! PoisonPill
- // }
- //
- // "A workflow" should "be able to detect faulted tuples and trigger
exception breakpoint in the workflow1, then modify them" in {
- // val parent = TestProbe()
- // val controller = parent.childActorOf(CONTROLLER.props(logicalPlan1))
- // controller ! AckedControllerInitialization
- // parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
- // controller ! Start
- // parent.expectMsg(ReportState(ControllerState.Running))
- // var isCompleted = false
- // parent.receiveWhile(30.seconds, 10.seconds) {
- // case ReportGlobalBreakpointTriggered(bp, opID) =>
- // for (i <- bp) {
- // log.info(
- // (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + "
ERRORS: [" + i._2
- // .mkString(",") + "]"
- // )
- // val fixed = new FaultedTuple(
- // ITuple("Asia", "Rwanda", "1", "0", "0", "0", "0", "0", "0",
"12", "12", "120", "12"),
- // i._1._2.id,
- // i._1._2.isInput
- // )
- // AdvancedMessageSending.blockingAskWithRetry(i._1._1,
ModifyTuple(fixed), 5)
- // }
- // controller ! Resume
- // case ReportState(ControllerState.Paused) =>
- // case ReportState(ControllerState.Completed) =>
- // isCompleted = true
- // case _ =>
- // }
- // assert(isCompleted)
- // parent.ref ! PoisonPill
- // }
- //
- // "A workflow" should "be able to trigger conditional breakpoint in the
workflow2, then resume them" in {
- // val parent = TestProbe()
- // val controller = parent.childActorOf(CONTROLLER.props(logicalPlan2))
- // controller ! AckedControllerInitialization
- // parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
- // controller ! PassBreakpointTo(
- // "Gen",
- // new ConditionalGlobalBreakpoint("ConditionalBreakpoint", x =>
x.getInt(0) % 1000 == 0)
- // )
- // controller ! Start
- // parent.expectMsg(ReportState(ControllerState.Running))
- // var isCompleted = false
- // parent.receiveWhile(30.seconds, 10.seconds) {
- // case ReportGlobalBreakpointTriggered(bp, opID) =>
- // for (i <- bp) {
- // log.info(
- // (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + "
ERRORS: [" + i._2
- // .mkString(",") + "]"
- // )
- // AdvancedMessageSending.blockingAskWithRetry(i._1._1,
ResumeTuple(i._1._2), 5)
- // }
- // controller ! Resume
- // case ReportState(ControllerState.Paused) =>
- // case ReportState(ControllerState.Completed) =>
- // isCompleted = true
- // case _ =>
- // }
- // assert(isCompleted)
- // parent.ref ! PoisonPill
- // }
- //
- // "A workflow" should "be able to trigger conditional breakpoint in the
workflow2, then skip them" in {
- // val parent = TestProbe()
- // val controller = parent.childActorOf(CONTROLLER.props(logicalPlan2))
- // controller ! AckedControllerInitialization
- // parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
- // controller ! PassBreakpointTo(
- // "Gen",
- // new ConditionalGlobalBreakpoint("ConditionalBreakpoint", x =>
x.getInt(0) % 1000 == 0)
- // )
- // controller ! Start
- // parent.expectMsg(ReportState(ControllerState.Running))
- // var isCompleted = false
- // parent.receiveWhile(30.seconds, 10.seconds) {
- // case ReportGlobalBreakpointTriggered(bp, opID) =>
- // for (i <- bp) {
- // log.info(
- // (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + "
ERRORS: [" + i._2
- // .mkString(",") + "]"
- // )
- // AdvancedMessageSending.blockingAskWithRetry(i._1._1,
SkipTuple(i._1._2), 5)
- // }
- // controller ! Resume
- // case ReportState(ControllerState.Paused) =>
- // case ReportState(ControllerState.Completed) =>
- // isCompleted = true
- // case _ =>
- // }
- // assert(isCompleted)
- // parent.ref ! PoisonPill
- // }
- //
- // "A workflow" should "be able to trigger count breakpoint in the
workflow2, then resume it" in {
- // val parent = TestProbe()
- // val controller = parent.childActorOf(CONTROLLER.props(logicalPlan2))
- // controller ! AckedControllerInitialization
- // parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
- // controller ! PassBreakpointTo("Gen", new
CountGlobalBreakpoint("CountBreakpoint", 500))
- // controller ! Start
- // parent.expectMsg(ReportState(ControllerState.Running))
- // var isCompleted = false
- // parent.receiveWhile(30.seconds, 10.seconds) {
- // case ReportGlobalBreakpointTriggered(bp, opID) =>
- // for (i <- bp) {
- // log.info(
- // (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + "
ERRORS: [" + i._2
- // .mkString(",") + "]"
- // )
- // }
- // controller ! Resume
- // case ReportState(ControllerState.Paused) =>
- // case ReportState(ControllerState.Completed) =>
- // isCompleted = true
- // case _ =>
- // }
- // assert(isCompleted)
- // parent.ref ! PoisonPill
- // }
- //
- // "A workflow" should "be able to trigger conditional breakpoint in the
workflow2, then resume it" in {
- // val parent = TestProbe()
- // val controller = parent.childActorOf(CONTROLLER.props(logicalPlan2))
- // controller ! AckedControllerInitialization
- // parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
- // controller ! PassBreakpointTo(
- // "Gen",
- // new ConditionalGlobalBreakpoint("ConditionalBreakpoint", x =>
x.getInt(0) % 1000 == 0)
- // )
- // controller ! Start
- // parent.expectMsg(ReportState(ControllerState.Running))
- // var isCompleted = false
- // parent.receiveWhile(30.seconds, 10.seconds) {
- // case ReportGlobalBreakpointTriggered(bp, opID) =>
- // for (i <- bp) {
- // log.info(
- // (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + "
ERRORS: [" + i._2
- // .mkString(",") + "]"
- // )
- // }
- // controller ! Resume
- // case ReportState(ControllerState.Paused) =>
- // case ReportState(ControllerState.Completed) =>
- // isCompleted = true
- // case _ =>
- // }
- // assert(isCompleted)
- // parent.ref ! PoisonPill
- // }
- //
- // "A workflow" should "be able to trigger count breakpoint in the
workflow1, then resume it" in {
- // val parent = TestProbe()
- // val controller = parent.childActorOf(CONTROLLER.props(logicalPlan1))
- // controller ! AckedControllerInitialization
- // parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
- // controller ! PassBreakpointTo("KeywordSearch1", new
CountGlobalBreakpoint("CountBreakpoint", 3))
- // controller ! Start
- // parent.expectMsg(ReportState(ControllerState.Running))
- // var isCompleted = false
- // parent.receiveWhile(3000.seconds, 1000.seconds) {
- // case ReportGlobalBreakpointTriggered(bp, opID) =>
- // for (i <- bp) {
- // log.info(
- // (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + "
ERRORS: [" + i._2
- // .mkString(",") + "]"
- // )
- // }
- // controller ! Resume
- // case ReportState(ControllerState.Paused) =>
- // case ReportState(ControllerState.Completed) =>
- // isCompleted = true
- // case _ =>
- // }
- // assert(isCompleted)
- // parent.ref ! PoisonPill
- // }
-
-}