This is an automated email from the ASF dual-hosted git repository. aglinxinyuan pushed a commit to branch xinyuan-scheduler-jump in repository https://gitbox.apache.org/repos/asf/texera.git
commit 555e55df5483a103e4282b503e33ee3b0dcc1bb2 Author: Xinyuan Lin <[email protected]> AuthorDate: Tue Apr 21 01:20:16 2026 -0700 Add scheduler jump-to-operator support --- .../engine/architecture/rpc/controlcommands.proto | 7 +++- .../architecture/rpc/controllerservice.proto | 3 +- .../amber/engine/architecture/rpc/__init__.py | 43 ++++++++++++++++++++++ .../ControllerAsyncRPCHandlerInitializer.scala | 1 + .../controller/ControllerProcessor.scala | 2 +- .../controller/WorkflowScheduler.scala | 3 ++ .../promisehandlers/JumpToOperatorHandler.scala} | 27 +++++++++----- .../engine/architecture/scheduling/Schedule.scala | 11 ++++++ .../scheduling/WorkflowExecutionCoordinator.scala | 11 ++++-- 9 files changed, 92 insertions(+), 16 deletions(-) diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index d714f64a15..d8d8b512d0 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -46,6 +46,7 @@ message ControlRequest { PortCompletedRequest portCompletedRequest = 9; WorkerStateUpdatedRequest workerStateUpdatedRequest = 10; LinkWorkersRequest linkWorkersRequest = 11; + JumpToOperatorRequest jumpToOperatorRequest = 12; // request for worker AddInputChannelRequest addInputChannelRequest = 50; @@ -278,4 +279,8 @@ enum StatisticsUpdateTarget { message QueryStatisticsRequest{ repeated core.ActorVirtualIdentity filterByWorkers = 1; StatisticsUpdateTarget updateTarget = 2; -} \ No newline at end of file +} + +message JumpToOperatorRequest{ + core.OperatorIdentity targetOperatorId = 1 [(scalapb.field).no_box = true]; +} diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto index 70d189a341..25c90e3e93 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto @@ -42,7 +42,8 @@ service ControllerService { rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn); rpc WorkerStateUpdated(WorkerStateUpdatedRequest) returns (EmptyReturn); rpc WorkerExecutionCompleted(EmptyRequest) returns (EmptyReturn); + rpc JumpToOperator(JumpToOperatorRequest) returns (EmptyReturn); rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn); rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn); rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn); -} \ No newline at end of file +} diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py index b7522a696a..f946e7123e 100644 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py +++ b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py @@ -102,6 +102,9 @@ class ControlRequest(betterproto.Message): link_workers_request: "LinkWorkersRequest" = betterproto.message_field( 11, group="sealed_value" ) + jump_to_operator_request: "JumpToOperatorRequest" = betterproto.message_field( + 12, group="sealed_value" + ) add_input_channel_request: "AddInputChannelRequest" = betterproto.message_field( 50, group="sealed_value" ) @@ -394,6 +397,11 @@ class QueryStatisticsRequest(betterproto.Message): update_target: "StatisticsUpdateTarget" = betterproto.enum_field(2) +@dataclass(eq=False, repr=False) +class JumpToOperatorRequest(betterproto.Message): + target_operator_id: "___core__.OperatorIdentity" = betterproto.message_field(1) + + @dataclass(eq=False, repr=False) class ControlReturn(betterproto.Message): """The generic return message""" @@ -1243,6 +1251,23 @@ class ControllerServiceStub(betterproto.ServiceStub): metadata=metadata, ) + async def jump_to_operator( + self, + jump_to_operator_request: "JumpToOperatorRequest", + *, + timeout: Optional[float] = None, + deadline: Optional["Deadline"] = None, + metadata: Optional["MetadataLike"] = None + ) -> "EmptyReturn": + return await self._unary_unary( + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperator", + jump_to_operator_request, + EmptyReturn, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + async def link_workers( self, link_workers_request: "LinkWorkersRequest", @@ -1880,6 +1905,11 @@ class ControllerServiceBase(ServiceBase): ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def jump_to_operator( + self, jump_to_operator_request: "JumpToOperatorRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def link_workers( self, link_workers_request: "LinkWorkersRequest" ) -> "EmptyReturn": @@ -1984,6 +2014,13 @@ class ControllerServiceBase(ServiceBase): response = await self.worker_execution_completed(request) await stream.send_message(response) + async def __rpc_jump_to_operator( + self, stream: "grpclib.server.Stream[JumpToOperatorRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.jump_to_operator(request) + await stream.send_message(response) + async def __rpc_link_workers( self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]" ) -> None: @@ -2079,6 +2116,12 @@ class ControllerServiceBase(ServiceBase): EmptyRequest, EmptyReturn, ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperator": grpclib.const.Handler( + self.__rpc_jump_to_operator, + grpclib.const.Cardinality.UNARY_UNARY, + JumpToOperatorRequest, + EmptyReturn, + ), "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers": grpclib.const.Handler( self.__rpc_link_workers, grpclib.const.Cardinality.UNARY_UNARY, diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala index 4d9a36bab4..03a4d47968 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala @@ -34,6 +34,7 @@ class ControllerAsyncRPCHandlerInitializer( with AmberLogging with LinkWorkersHandler with WorkerExecutionCompletedHandler + with JumpToOperatorHandler with WorkerStateUpdatedHandler with PauseHandler with QueryWorkerStatisticsHandler diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala index 7a8e94cf3a..3461619cb3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala @@ -44,7 +44,7 @@ class ControllerProcessor( val workflowScheduler: WorkflowScheduler = new WorkflowScheduler(workflowContext, actorId) val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new WorkflowExecutionCoordinator( - () => this.workflowScheduler.getNextRegions, + workflowScheduler, workflowExecution, controllerConfig, asyncRPCClient diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index 9dcf3ad4bf..c8a107e045 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -20,6 +20,7 @@ package org.apache.texera.amber.engine.architecture.controller import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity +import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.{PhysicalPlan, WorkflowContext} import org.apache.texera.amber.engine.architecture.scheduling.{ CostBasedScheduleGenerator, @@ -54,4 +55,6 @@ class WorkflowScheduler( def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() + def jumpToOperator(opId: OperatorIdentity): Unit = schedule.jumpToOperator(opId) + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorHandler.scala similarity index 50% copy from amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala copy to amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorHandler.scala index 6f34c9ed1e..aad72f08e9 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorHandler.scala @@ -17,18 +17,25 @@ * under the License. */ -package org.apache.texera.amber.engine.architecture.scheduling +package org.apache.texera.amber.engine.architecture.controller.promisehandlers -case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { - private var currentLevel = levelSets.keys.minOption.getOrElse(0) - - def getRegions: List[Region] = levelSets.values.flatten.toList +import com.twitter.util.Future +import org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + JumpToOperatorRequest +} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn - override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel) +/** Requests the scheduler to continue from the region containing the target operator. */ +trait JumpToOperatorHandler { + this: ControllerAsyncRPCHandlerInitializer => - override def next(): Set[Region] = { - val regions = levelSets(currentLevel) - currentLevel += 1 - regions + override def jumpToOperator( + msg: JumpToOperatorRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + cp.workflowExecutionCoordinator.jumpToOperator(msg.targetOperatorId) + EmptyReturn() } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index 6f34c9ed1e..fa8121cad5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -19,6 +19,8 @@ package org.apache.texera.amber.engine.architecture.scheduling +import org.apache.texera.amber.core.virtualidentity.OperatorIdentity + case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) @@ -31,4 +33,13 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat currentLevel += 1 regions } + + def jumpToOperator(opId: OperatorIdentity): Unit = + levelSets + .collectFirst { + case (level, regions) + if regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) => + level + } + .foreach(currentLevel = _) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 05585f88d8..2b8e3ce145 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -21,19 +21,20 @@ package org.apache.texera.amber.engine.architecture.scheduling import com.twitter.util.Future import com.typesafe.scalalogging.LazyLogging +import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink} import org.apache.texera.amber.engine.architecture.common.{ AkkaActorRefMappingService, AkkaActorService } -import org.apache.texera.amber.engine.architecture.controller.ControllerConfig +import org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, WorkflowScheduler} import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient import scala.collection.mutable class WorkflowExecutionCoordinator( - getNextRegions: () => Set[Region], + workflowScheduler: WorkflowScheduler, workflowExecution: WorkflowExecution, controllerConfig: ControllerConfig, asyncRPCClient: AsyncRPCClient @@ -81,7 +82,7 @@ class WorkflowExecutionCoordinator( // All existing regions are completed. Start the next region (if any). Future .collect({ - val nextRegions = getNextRegions() + val nextRegions = workflowScheduler.getNextRegions executedRegions.append(nextRegions) nextRegions .map(region => { @@ -116,4 +117,8 @@ class WorkflowExecutionCoordinator( .toSet } + def jumpToOperator(opId: OperatorIdentity): Unit = { + workflowScheduler.jumpToOperator(opId) + } + }
