This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 2b763e442d refactor(amber): rename remaining Akka* identifiers to
Pekko* (#4949)
2b763e442d is described below
commit 2b763e442d3b9cd787bfd46f238c3b0bbccf0afc
Author: Yicong Huang <[email protected]>
AuthorDate: Mon May 11 21:32:08 2026 -0700
refactor(amber): rename remaining Akka* identifiers to Pekko* (#4949)
### What changes were proposed in this PR?
The project moved off Akka onto Apache Pekko, but several internal Scala
identifiers still carried the `Akka` prefix even though they wrap Pekko
APIs. Pure rename across `amber` and `common/config`:
- `AkkaConfig` → `PekkoConfig` (object + file)
- `AkkaActorService` → `PekkoActorService` (class + file)
- `AkkaActorRefMappingService` → `PekkoActorRefMappingService`
- `AkkaMessageTransferService` → `PekkoMessageTransferService`
- `akkaConfig`, `akkaActorService` method/parameter names →
pekko-prefixed
No behavior change. No string literals, config keys, or serialization
registrations are touched — `cluster.conf` already uses `pekko.*` keys,
the kryo registry doesn't reference these classes by name. The
intentional `"akka"` literal in `DeployStrategiesSpec.scala` that
contrasts pekko vs akka address strings stays.
### Any related issues, documentation, discussions?
Closes #4948.
### How was this PR tested?
`sbt WorkflowExecutionService/Test/compile` clean, `sbt
WorkflowExecutionService/scalafmtCheckAll` clean.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7 (Claude Code)
---
.../architecture/common/ExecutorDeployment.scala | 2 +-
...rvice.scala => PekkoActorRefMappingService.scala} | 2 +-
...kkaActorService.scala => PekkoActorService.scala} | 2 +-
...rvice.scala => PekkoMessageTransferService.scala} | 6 +++---
.../engine/architecture/common/WorkflowActor.scala | 10 +++++-----
.../controller/ControllerProcessor.scala | 20 ++++++++++----------
.../controller/ControllerTimerService.scala | 6 +++---
.../messaginglayer/WorkerTimerService.scala | 4 ++--
.../scheduling/RegionExecutionCoordinator.scala | 10 +++++-----
.../scheduling/WorkflowExecutionCoordinator.scala | 10 +++++-----
.../texera/amber/engine/common/AmberRuntime.scala | 10 +++++-----
.../engine/e2e/ReconfigurationIntegrationSpec.scala | 2 +-
.../logreplay/LogreplayPrimitivesSpec.scala | 2 +-
.../messaginglayer/CongestionControlSpec.scala | 2 +-
.../scheduling/RegionCoordinatorTestSupport.scala | 10 +++++-----
.../scheduling/RegionExecutionCoordinatorSpec.scala | 6 +++---
.../WorkflowExecutionCoordinatorSpec.scala | 2 +-
.../engine/architecture/worker/WorkerSpec.scala | 2 +-
.../engine/common/CheckpointSubsystemSpec.scala | 2 +-
.../texera/amber/engine/e2e/DataProcessingSpec.scala | 2 +-
.../apache/texera/amber/engine/e2e/PauseSpec.scala | 2 +-
.../amber/engine/e2e/ReconfigurationSpec.scala | 2 +-
.../amber/engine/faulttolerance/CheckpointSpec.scala | 2 +-
.../amber/engine/faulttolerance/LoggingSpec.scala | 2 +-
.../config/{AkkaConfig.scala => PekkoConfig.scala} | 4 ++--
25 files changed, 62 insertions(+), 62 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
index cf41297c98..fbb5b99ce6 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
@@ -38,7 +38,7 @@ object ExecutorDeployment {
def createWorkers(
op: PhysicalOp,
- controllerActorService: AkkaActorService,
+ controllerActorService: PekkoActorService,
operatorExecution: OperatorExecution,
operatorConfig: OperatorConfig,
stateRestoreConfig: Option[StateRestoreConfig],
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorRefMappingService.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala
similarity index 98%
rename from
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorRefMappingService.scala
rename to
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala
index 6cad314703..323435891e 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorRefMappingService.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala
@@ -33,7 +33,7 @@ import org.apache.texera.amber.util.VirtualIdentityUtils
import scala.collection.mutable
-class AkkaActorRefMappingService(actorService: AkkaActorService) extends
AmberLogging {
+class PekkoActorRefMappingService(actorService: PekkoActorService) extends
AmberLogging {
override def actorId: ActorVirtualIdentity = actorService.id
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorService.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorService.scala
similarity index 96%
rename from
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorService.scala
rename to
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorService.scala
index 10a6d7a38c..f5bbd0619c 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorService.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorService.scala
@@ -28,7 +28,7 @@ import org.apache.texera.amber.engine.common.FutureBijection._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{DurationInt, FiniteDuration}
-class AkkaActorService(val id: ActorVirtualIdentity, actorContext:
ActorContext) {
+class PekkoActorService(val id: ActorVirtualIdentity, actorContext:
ActorContext) {
implicit def ec: ExecutionContext = actorContext.dispatcher
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoMessageTransferService.scala
similarity index 98%
rename from
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala
rename to
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoMessageTransferService.scala
index 3401e3ff63..cba9b0b2ee 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoMessageTransferService.scala
@@ -30,9 +30,9 @@ import
org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
-class AkkaMessageTransferService(
- actorService: AkkaActorService,
- refService: AkkaActorRefMappingService,
+class PekkoMessageTransferService(
+ actorService: PekkoActorService,
+ refService: PekkoActorRefMappingService,
handleBackpressure: Boolean => Unit
) extends AmberLogging {
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala
index 5ce64a0a3e..0f744dca26 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala
@@ -83,9 +83,9 @@ abstract class WorkflowActor(
with AmberLogging {
//
- // Akka related components:
+ // Pekko related components:
//
- val actorService: AkkaActorService = new AkkaActorService(actorId,
this.context)
+ val actorService: PekkoActorService = new PekkoActorService(actorId,
this.context)
actorService.getAvailableNodeAddressesFunc = () => {
implicit val timeout: Timeout = 5.seconds
Await
@@ -95,12 +95,12 @@ abstract class WorkflowActor(
)
.asInstanceOf[Array[Address]]
}
- val actorRefMappingService: AkkaActorRefMappingService = new
AkkaActorRefMappingService(
+ val actorRefMappingService: PekkoActorRefMappingService = new
PekkoActorRefMappingService(
actorService
)
actorRefMappingService.registerActorRef(actorId, self)
- val transferService: AkkaMessageTransferService =
- new AkkaMessageTransferService(actorService, actorRefMappingService,
handleBackpressure)
+ val transferService: PekkoMessageTransferService =
+ new PekkoMessageTransferService(actorService, actorRefMappingService,
handleBackpressure)
logger.info(s"worker replay log writing conf: $replayLogConfOpt")
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
index 3ff8e7d978..ef33174b6b 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -22,9 +22,9 @@ package org.apache.texera.amber.engine.architecture.controller
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.core.workflow.WorkflowContext
import org.apache.texera.amber.engine.architecture.common.{
- AkkaActorRefMappingService,
- AkkaActorService,
- AkkaMessageTransferService,
+ PekkoActorRefMappingService,
+ PekkoActorService,
+ PekkoMessageTransferService,
AmberProcessor
}
import
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
@@ -57,21 +57,21 @@ class ControllerProcessor(
this.controllerTimerService = controllerTimerService
}
- @transient var transferService: AkkaMessageTransferService = _
+ @transient var transferService: PekkoMessageTransferService = _
- def setupTransferService(transferService: AkkaMessageTransferService): Unit
= {
+ def setupTransferService(transferService: PekkoMessageTransferService): Unit
= {
this.transferService = transferService
}
- @transient var actorService: AkkaActorService = _
+ @transient var actorService: PekkoActorService = _
- def setupActorService(akkaActorService: AkkaActorService): Unit = {
- this.actorService = akkaActorService
+ def setupActorService(pekkoActorService: PekkoActorService): Unit = {
+ this.actorService = pekkoActorService
}
- @transient var actorRefService: AkkaActorRefMappingService = _
+ @transient var actorRefService: PekkoActorRefMappingService = _
- def setupActorRefService(actorRefService: AkkaActorRefMappingService): Unit
= {
+ def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit
= {
this.actorRefService = actorRefService
this.workflowExecutionCoordinator.setupActorRefService(this.actorRefService)
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
index a778a27c46..a4ad0898c9 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
@@ -20,7 +20,7 @@
package org.apache.texera.amber.engine.architecture.controller
import org.apache.pekko.actor.Cancellable
-import org.apache.texera.amber.engine.architecture.common.AkkaActorService
+import org.apache.texera.amber.engine.architecture.common.PekkoActorService
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
QueryStatisticsRequest,
@@ -34,7 +34,7 @@ import scala.concurrent.duration.{DurationInt,
FiniteDuration, MILLISECONDS}
class ControllerTimerService(
controllerConfig: ControllerConfig,
- akkaActorService: AkkaActorService
+ pekkoActorService: PekkoActorService
) {
var statusUpdateAskHandle: Option[Cancellable] = None
var runtimeStatisticsAskHandle: Option[Cancellable] = None
@@ -46,7 +46,7 @@ class ControllerTimerService(
): Option[Cancellable] = {
if (intervalMs.nonEmpty && handleOpt.isEmpty) {
Option(
- akkaActorService.sendToSelfWithFixedDelay(
+ pekkoActorService.sendToSelfWithFixedDelay(
0.milliseconds,
FiniteDuration.apply(intervalMs.get, MILLISECONDS),
ControlInvocation(
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala
index 3bb87febd9..006c9614fd 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala
@@ -21,7 +21,7 @@ package
org.apache.texera.amber.engine.architecture.messaginglayer
import org.apache.pekko.actor.Cancellable
import org.apache.texera.amber.config.ApplicationConfig
-import org.apache.texera.amber.engine.architecture.common.AkkaActorService
+import org.apache.texera.amber.engine.architecture.common.PekkoActorService
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
EmptyRequest
@@ -33,7 +33,7 @@ import
org.apache.texera.amber.engine.common.virtualidentity.util.SELF
import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}
-class WorkerTimerService(actorService: AkkaActorService) {
+class WorkerTimerService(actorService: PekkoActorService) {
private val enabledAdaptiveBatching =
ApplicationConfig.enableAdaptiveNetworkBuffering
private val adaptiveBatchInterval =
ApplicationConfig.adaptiveBufferingTimeoutMs
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 254c16bf34..2971e4c4f4 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -26,8 +26,8 @@ import
org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.core.workflow.{GlobalPortIdentity,
PhysicalLink, PhysicalOp}
import org.apache.texera.amber.engine.architecture.common.{
- AkkaActorRefMappingService,
- AkkaActorService,
+ PekkoActorRefMappingService,
+ PekkoActorService,
ExecutorDeployment
}
import org.apache.texera.amber.engine.architecture.controller.execution.{
@@ -95,8 +95,8 @@ class RegionExecutionCoordinator(
workflowExecution: WorkflowExecution,
asyncRPCClient: AsyncRPCClient,
controllerConfig: ControllerConfig,
- actorService: AkkaActorService,
- actorRefService: AkkaActorRefMappingService
+ actorService: PekkoActorService,
+ actorRefService: PekkoActorRefMappingService
) extends AmberLogging {
initRegionExecution()
@@ -374,7 +374,7 @@ class RegionExecutionCoordinator(
}
private def buildOperator(
- actorService: AkkaActorService,
+ actorService: PekkoActorService,
physicalOp: PhysicalOp,
operatorConfig: OperatorConfig,
operatorExecution: OperatorExecution
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index df504bf92d..deb753beb3 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -23,8 +23,8 @@ import com.twitter.util.Future
import com.typesafe.scalalogging.LazyLogging
import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink}
import org.apache.texera.amber.engine.architecture.common.{
- AkkaActorRefMappingService,
- AkkaActorService
+ PekkoActorRefMappingService,
+ PekkoActorService
}
import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
import
org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate
@@ -49,9 +49,9 @@ class WorkflowExecutionCoordinator(
mutable.HashMap()
private val completionNotified: AtomicBoolean = new AtomicBoolean(false)
- @transient var actorRefService: AkkaActorRefMappingService = _
+ @transient var actorRefService: PekkoActorRefMappingService = _
- def setupActorRefService(actorRefService: AkkaActorRefMappingService): Unit
= {
+ def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit
= {
this.actorRefService = actorRefService
}
@@ -62,7 +62,7 @@ class WorkflowExecutionCoordinator(
*
* After the syncs, if there are no running region(s), it will start new
regions (if available).
*/
- def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit]
= {
+ def coordinateRegionExecutors(actorService: PekkoActorService): Future[Unit]
= {
val unfinishedRegionCoordinators =
regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala
index 7078f766a6..03234a277e 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala
@@ -23,7 +23,7 @@ import org.apache.pekko.actor.{ActorSystem, Address,
Cancellable, DeadLetter, Pr
import org.apache.pekko.serialization.{Serialization, SerializationExtension}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.texera.amber.clustering.ClusterListener
-import org.apache.texera.amber.config.AkkaConfig
+import org.apache.texera.amber.config.PekkoConfig
import
org.apache.texera.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor
import java.io.{BufferedReader, InputStreamReader}
@@ -39,7 +39,7 @@ object AmberRuntime {
def serde: Serialization = {
if (_serde == null) {
if (_actorSystem == null) {
- _serde = SerializationExtension(ActorSystem("Amber", akkaConfig))
+ _serde = SerializationExtension(ActorSystem("Amber", pekkoConfig))
} else {
_serde = SerializationExtension(_actorSystem)
}
@@ -83,13 +83,13 @@ object AmberRuntime {
pekko.remote.artery.canonical.hostname = $localIpAddress
pekko.cluster.seed-nodes = [ "pekko://Amber@$localIpAddress:2552" ]
""")
- .withFallback(akkaConfig)
+ .withFallback(pekkoConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
createAmberSystem(masterConfig)
}
- def akkaConfig: Config = AkkaConfig.akkaConfig
+ def pekkoConfig: Config = PekkoConfig.pekkoConfig
private def createMasterAddress(addr: String): Address = Address("pekko",
"Amber", addr, 2552)
@@ -105,7 +105,7 @@ object AmberRuntime {
pekko.remote.artery.canonical.port = 0
pekko.cluster.seed-nodes = [ "pekko://Amber@$addr:2552" ]
""")
- .withFallback(akkaConfig)
+ .withFallback(pekkoConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(addr)
createAmberSystem(workerConfig)
diff --git
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
index acadb9154c..6f0936da28 100644
---
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
+++
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
@@ -62,7 +62,7 @@ import scala.concurrent.duration._
*/
@IntegrationTest
class ReconfigurationIntegrationSpec
- extends TestKit(ActorSystem("ReconfigurationIntegrationSpec",
AmberRuntime.akkaConfig))
+ extends TestKit(ActorSystem("ReconfigurationIntegrationSpec",
AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
index ddb6440c25..6ec33a6988 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
@@ -62,7 +62,7 @@ class LogreplayPrimitivesSpec extends AnyFlatSpec with
BeforeAndAfterAll {
// so no Pekko threads outlive the suite. (Same pattern as
// CheckpointSubsystemSpec.)
private val testSystem: ActorSystem =
- ActorSystem("LogreplayPrimitivesSpec-test", AmberRuntime.akkaConfig)
+ ActorSystem("LogreplayPrimitivesSpec-test", AmberRuntime.pekkoConfig)
private val testSerde: Serialization = SerializationExtension(testSystem)
private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
index 322f96924d..30fa8ec8b4 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
@@ -214,7 +214,7 @@ class CongestionControlSpec extends AnyFlatSpec {
}
it should "return only the messages whose sentTime is older than
resendTimeLimit" in {
- // Cover the AkkaMessageTransferService.checkResend() retransmission path:
+ // Cover the PekkoMessageTransferService.checkResend() retransmission path:
// the in-transit message that has been sitting past the 60s
// resendTimeLimit must surface; the freshly-sent one must not.
val cc = new CongestionControl()
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
index facba10241..5673c02691 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
@@ -35,8 +35,8 @@ import org.apache.texera.amber.core.workflow.WorkflowContext.{
DEFAULT_WORKFLOW_ID
}
import org.apache.texera.amber.engine.architecture.common.{
- AkkaActorRefMappingService,
- AkkaActorService,
+ PekkoActorRefMappingService,
+ PekkoActorService,
WorkflowActor
}
import
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
@@ -78,8 +78,8 @@ object RegionCoordinatorTestSupport {
)
case class ControllerHarnessFixture(
- actorService: AkkaActorService,
- actorRefService: AkkaActorRefMappingService
+ actorService: PekkoActorService,
+ actorRefService: PekkoActorRefMappingService
)
/**
@@ -231,7 +231,7 @@ trait RegionCoordinatorTestSupport { self: TestKit =>
}
protected def registerLiveWorker(
- actorRefService: AkkaActorRefMappingService,
+ actorRefService: PekkoActorRefMappingService,
workerId: ActorVirtualIdentity
): ActorRef = {
val workerRef = system.actorOf(Props(new IdleActor),
s"worker-${System.nanoTime()}")
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
index 8fab3b67fc..6efbe5e4ca 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
@@ -24,7 +24,7 @@ import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.testkit.TestKit
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
import org.apache.texera.amber.core.workflow.PhysicalOp
-import
org.apache.texera.amber.engine.architecture.common.AkkaActorRefMappingService
+import
org.apache.texera.amber.engine.architecture.common.PekkoActorRefMappingService
import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
import
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
import org.apache.texera.amber.engine.architecture.rpc.controlreturns._
@@ -51,7 +51,7 @@ import java.util.concurrent.atomic
* workers terminated, and allow the next region to start.
*/
class RegionExecutionCoordinatorSpec
- extends TestKit(ActorSystem("RegionExecutionCoordinatorSpec",
AmberRuntime.akkaConfig))
+ extends TestKit(ActorSystem("RegionExecutionCoordinatorSpec",
AmberRuntime.pekkoConfig))
with AnyFlatSpecLike
with BeforeAndAfterAll
with RegionCoordinatorTestSupport {
@@ -117,7 +117,7 @@ class RegionExecutionCoordinatorSpec
region: Region,
physicalOp: PhysicalOp,
workerId: ActorVirtualIdentity,
- actorRefService: AkkaActorRefMappingService
+ actorRefService: PekkoActorRefMappingService
)
private def createSingleRegionFixture(
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
index f5fc17f8e0..1a1e4afa31 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
@@ -38,7 +38,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpecLike
class WorkflowExecutionCoordinatorSpec
- extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec",
AmberRuntime.akkaConfig))
+ extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec",
AmberRuntime.pekkoConfig))
with AnyFlatSpecLike
with BeforeAndAfterAll
with RegionCoordinatorTestSupport {
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala
index 890fe97b85..f6f33ebbb0 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala
@@ -64,7 +64,7 @@ class DummyOperatorExecutor extends OperatorExecutor {
}
class WorkerSpec
- extends TestKit(ActorSystem("WorkerSpec", AmberRuntime.akkaConfig))
+ extends TestKit(ActorSystem("WorkerSpec", AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
index 45b1727afc..6b46030b6b 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
@@ -35,7 +35,7 @@ class CheckpointSubsystemSpec extends AnyFlatSpec with
BeforeAndAfterAll {
// and AmberRuntime's reference are torn down in afterAll, so no Pekko
// threads outlive the test (matching ControllerSpec/WorkerSpec hygiene).
private val testSystem: ActorSystem =
- ActorSystem("CheckpointSubsystemSpec-test", AmberRuntime.akkaConfig)
+ ActorSystem("CheckpointSubsystemSpec-test", AmberRuntime.pekkoConfig)
private val testSerde: Serialization = SerializationExtension(testSystem)
private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index f93909f53f..d070fefb27 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -55,7 +55,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach,
Outcome, Retries}
import scala.concurrent.duration.DurationInt
class DataProcessingSpec
- extends TestKit(ActorSystem("DataProcessingSpec", AmberRuntime.akkaConfig))
+ extends TestKit(ActorSystem("DataProcessingSpec",
AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
index b459533c57..2cc268608f 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
@@ -51,7 +51,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach,
Outcome, Retries}
import scala.concurrent.duration._
class PauseSpec
- extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig))
+ extends TestKit(ActorSystem("PauseSpec", AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 8cabf8684a..2cd3559736 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -42,7 +42,7 @@ import org.scalatest.flatspec.AnyFlatSpecLike
import scala.concurrent.duration._
class ReconfigurationSpec
- extends TestKit(ActorSystem("ReconfigurationSpec",
AmberRuntime.akkaConfig))
+ extends TestKit(ActorSystem("ReconfigurationSpec",
AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
index 73d4601a92..fbc7e8044d 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
@@ -59,7 +59,7 @@ class CheckpointSpec extends AnyFlatSpecLike with
BeforeAndAfterAll {
)
override def beforeAll(): Unit = {
- system = ActorSystem("CheckpointSpec", AmberRuntime.akkaConfig)
+ system = ActorSystem("CheckpointSpec", AmberRuntime.pekkoConfig)
system.actorOf(Props[SingleNodeListener](), "cluster-info")
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala
index 87e3ca148e..9a38811915 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala
@@ -61,7 +61,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import java.net.URI
class LoggingSpec
- extends TestKit(ActorSystem("LoggingSpec", AmberRuntime.akkaConfig))
+ extends TestKit(ActorSystem("LoggingSpec", AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
diff --git
a/common/config/src/main/scala/org/apache/texera/amber/config/AkkaConfig.scala
b/common/config/src/main/scala/org/apache/texera/amber/config/PekkoConfig.scala
similarity index 90%
rename from
common/config/src/main/scala/org/apache/texera/amber/config/AkkaConfig.scala
rename to
common/config/src/main/scala/org/apache/texera/amber/config/PekkoConfig.scala
index 7f2097063a..33ba24d247 100644
---
a/common/config/src/main/scala/org/apache/texera/amber/config/AkkaConfig.scala
+++
b/common/config/src/main/scala/org/apache/texera/amber/config/PekkoConfig.scala
@@ -20,11 +20,11 @@ package org.apache.texera.amber.config
import com.typesafe.config.{Config, ConfigFactory}
-object AkkaConfig {
+object PekkoConfig {
// Load configuration
private val conf: Config =
ConfigFactory.parseResources("cluster.conf").resolve()
// Return the complete Pekko configuration with fallback to default
application config
- def akkaConfig: Config =
conf.withFallback(ConfigFactory.defaultApplication()).resolve()
+ def pekkoConfig: Config =
conf.withFallback(ConfigFactory.defaultApplication()).resolve()
}