This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new c58c881d Check executor id consistency when receive stop executor
request (#335)
c58c881d is described below
commit c58c881de14eef6fb8475a2135768e96a73ac2a0
Author: yahoNanJing <[email protected]>
AuthorDate: Mon Oct 10 21:13:33 2022 +0800
Check executor id consistency when receive stop executor request (#335)
Co-authored-by: yangzhong <[email protected]>
---
ballista/rust/core/proto/ballista.proto | 5 +++--
ballista/rust/executor/src/executor_server.rs | 7 +++++++
ballista/rust/scheduler/src/scheduler_server/mod.rs | 1 +
3 files changed, 11 insertions(+), 2 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 53f93877..9cb8cca4 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -845,10 +845,11 @@ message HeartBeatResult {
}
message StopExecutorParams {
+ string executor_id = 1;
// stop reason
- string reason = 1;
+ string reason = 2;
// force to stop the executor immediately
- bool force = 2;
+ bool force = 3;
}
message StopExecutorResult {
diff --git a/ballista/rust/executor/src/executor_server.rs
b/ballista/rust/executor/src/executor_server.rs
index bf036d6f..e850c895 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -665,6 +665,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorGrpc
request: Request<StopExecutorParams>,
) -> Result<Response<StopExecutorResult>, Status> {
let stop_request = request.into_inner();
+ if stop_request.executor_id != self.executor.metadata.id {
+ warn!(
+ "The executor id {} in request is different from {}. The stop
request will be ignored",
+ stop_request.executor_id, self.executor.metadata.id
+ );
+ return Ok(Response::new(StopExecutorResult {}));
+ }
let stop_reason = stop_request.reason;
let force = stop_request.force;
info!(
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs
b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index 6cfbf070..6d517bf9 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -229,6 +229,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
tokio::task::spawn(async move {
match client
.stop_executor(StopExecutorParams {
+ executor_id,
reason: stop_reason,
force: true,
})