This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new de9f3944a [coordinator] Log the reason for failures in bucket or
replica state changes (#2145)
de9f3944a is described below
commit de9f3944acf58ffa3f3aa145bab14fdbd90df6cf
Author: Yang Wang <[email protected]>
AuthorDate: Sat Dec 13 22:46:17 2025 +0800
[coordinator] Log the reason for failures in bucket or replica state
changes (#2145)
---
.../statemachine/ReplicaStateMachine.java | 24 ++++++----
.../statemachine/TableBucketStateMachine.java | 55 +++++++++++++++-------
2 files changed, 54 insertions(+), 25 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
index 235f4151d..7494ab203 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
@@ -221,8 +221,8 @@ public class ReplicaStateMachine {
try {
partitionName =
getPartitionName(tableBucket);
} catch (PartitionNotExistException e) {
- LOG.error(e.getMessage());
- logFailedSateChange(replica, currentState,
targetState);
+ logFailedSateChange(
+ replica, currentState,
targetState, e.getMessage());
return;
}
@@ -282,11 +282,11 @@ public class ReplicaStateMachine {
try {
partitionName = getPartitionName(tableBucket);
} catch (PartitionNotExistException e) {
- LOG.error(e.getMessage());
logFailedSateChange(
tableBucketReplica,
coordinatorContext.getReplicaState(tableBucketReplica),
- targetState);
+ targetState,
+ e.getMessage());
continue;
}
// send leader request to the replica server
@@ -343,7 +343,11 @@ public class ReplicaStateMachine {
return true;
} else {
logInvalidTransition(replica, curState,
targetState);
- logFailedSateChange(replica, curState,
targetState);
+ logFailedSateChange(
+ replica,
+ curState,
+ targetState,
+ "Invalid Replica State Transition.");
return false;
}
})
@@ -376,12 +380,16 @@ public class ReplicaStateMachine {
}
private void logFailedSateChange(
- TableBucketReplica replica, ReplicaState currState, ReplicaState
targetState) {
+ TableBucketReplica replica,
+ ReplicaState currState,
+ ReplicaState targetState,
+ String reason) {
LOG.error(
- "Fail to change state for table bucket replica {} from {} to
{}.",
+ "Fail to change state for table bucket replica {} from {} to
{}, reason: {}.",
stringifyReplica(replica),
currState,
- targetState);
+ targetState,
+ reason);
}
private void logSuccessfulStateChange(
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
index 33e278f44..224ba5db8 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
@@ -209,10 +209,13 @@ public class TableBucketStateMachine {
partitionName =
coordinatorContext.getPartitionName(tableBucket.getPartitionId());
if (partitionName == null) {
- LOG.error(
- "Can't find partition name for partition: {}.",
- tableBucket.getBucket());
- logFailedStateChange(tableBucket, currentState,
targetState);
+ logFailedStateChange(
+ tableBucket,
+ currentState,
+ targetState,
+ String.format(
+ "Can't find partition name for
partition: %s.",
+ tableBucket.getBucket()));
return;
}
}
@@ -222,7 +225,8 @@ public class TableBucketStateMachine {
Optional<ElectionResult> optionalElectionResult =
initLeaderForTableBuckets(tableBucket,
assignedServers);
if (!optionalElectionResult.isPresent()) {
- logFailedStateChange(tableBucket, currentState,
targetState);
+ logFailedStateChange(
+ tableBucket, currentState, targetState, "Elect
Result is empty.");
} else {
// transmit state
doStateChange(tableBucket, targetState);
@@ -244,7 +248,8 @@ public class TableBucketStateMachine {
electNewLeaderForTableBuckets(
tableBucket,
replicaLeaderElectionStrategy);
if (!optionalElectionResult.isPresent()) {
- logFailedStateChange(tableBucket, currentState,
targetState);
+ logFailedStateChange(
+ tableBucket, currentState, targetState, "Elect
result is empty.");
} else {
// transmit state
doStateChange(tableBucket, targetState);
@@ -337,10 +342,13 @@ public class TableBucketStateMachine {
if (tableBucket.getPartitionId() != null) {
partitionName =
coordinatorContext.getPartitionName(tableBucket.getPartitionId());
if (partitionName == null) {
- LOG.error(
- "Can't find partition name for partition: {}.",
- tableBucket.getBucket());
- logFailedStateChange(tableBucket, currentState,
BucketState.OnlineBucket);
+ logFailedStateChange(
+ tableBucket,
+ currentState,
+ BucketState.OnlineBucket,
+ String.format(
+ "Can't find partition name for partition:
%s.",
+ tableBucket.getBucket()));
continue;
}
}
@@ -350,7 +358,11 @@ public class TableBucketStateMachine {
Optional<ElectionResult> optionalElectionResult =
doInitElectionForBucket(tableBucket, assignedServers);
if (!optionalElectionResult.isPresent()) {
- logFailedStateChange(tableBucket, currentState,
BucketState.OnlineBucket);
+ logFailedStateChange(
+ tableBucket,
+ currentState,
+ BucketState.OnlineBucket,
+ "Elect result is empty.");
continue;
}
ElectionResult electionResult = optionalElectionResult.get();
@@ -506,7 +518,8 @@ public class TableBucketStateMachine {
return true;
} else {
logInvalidTransition(tableBucket, curState, targetState);
- logFailedStateChange(tableBucket, curState, targetState);
+ logFailedStateChange(
+ tableBucket, curState, targetState, "Invalid TableBucket
State Transition.");
return false;
}
}
@@ -537,12 +550,16 @@ public class TableBucketStateMachine {
}
private void logFailedStateChange(
- TableBucket tableBucket, BucketState currState, BucketState
targetState) {
+ TableBucket tableBucket,
+ BucketState currState,
+ BucketState targetState,
+ String reason) {
LOG.error(
- "Fail to change state for table bucket {} from {} to {}.",
+ "Fail to change state for table bucket {} from {} to {},
reason: {}",
stringifyBucket(tableBucket),
currState,
- targetState);
+ targetState,
+ reason);
}
private void logSuccessfulStateChange(
@@ -610,8 +627,12 @@ public class TableBucketStateMachine {
if (!resultOpt.isPresent()) {
LOG.error(
- "The leader election for table bucket {} is empty.",
- stringifyBucket(tableBucket));
+ "The leader election for table bucket {} is empty,
assignment: {}, live replicas: {}, leaderAndIsr: {}, strategy: {}",
+ stringifyBucket(tableBucket),
+ assignment,
+ liveReplicas,
+ leaderAndIsr,
+ electionStrategy);
return Optional.empty();
}
return resultOpt;