wuchong commented on code in PR #1081:
URL: https://github.com/apache/fluss/pull/1081#discussion_r2296132559


##########
fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/ReplicaStateMachine.java:
##########
@@ -438,25 +439,21 @@ private Map<TableBucketReplica, LeaderAndIsr> 
doRemoveReplicaFromIsr(
             List<Integer> newIsr =
                     leaderAndIsr.isr().size() == 1
                             // don't remove the replica id from isr when isr 
size is 1,
-                            // if isr is empty, we can't elect leader any more
+                            // if isr is empty, we can't elect leader anymore
                             ? leaderAndIsr.isr()
                             : leaderAndIsr.isr().stream()
                                     .filter(id -> id != replicaId)
                                     .collect(Collectors.toList());
             LeaderAndIsr adjustLeaderAndIsr = 
leaderAndIsr.newLeaderAndIsr(newLeader, newIsr);
-            try {
-                zooKeeperClient.updateLeaderAndIsr(tableBucket, 
adjustLeaderAndIsr);
-            } catch (Exception e) {
-                LOG.error(
-                        "Fail to update bucket LeaderAndIsr for table bucket 
{} of table {}.",
-                        tableBucket,
-                        
coordinatorContext.getTablePathById(tableBucket.getTableId()),
-                        e);
-                continue;
-            }
-            // update leader and isr
-            coordinatorContext.putBucketLeaderAndIsr(tableBucket, 
adjustLeaderAndIsr);
             adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
+            toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
+        }
+        try {
+            zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList);
+            
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);

Review Comment:
   As we moved the `coordinatorContext` updates at the end, this will change 
the logic of this method, because it always gets the current `leaderAndIsr` 
from `coordinatorContext`. 
   
   For example, given we have `(leader=1, isr=[1,2,3])` in zk and 
`coordinatorContext`, and the replicas to offline are `[1,2]`. When we process 
offline replica `1`, it puts `(leader=-1, isr=[2,3])` to the temporary map. 
When we process offline replica `2`, the `leaderAndIsr` in `coordinatorContext` 
is still `(leader=1, isr=[1,2,3])`,  it puts `(leader=1, isr=[1,3])` to the 
temporary map and overrides the previous `(leader=-1, isr=[2,3])`. In the end, 
we get the wrong state in zk and `coordinatorContext`. 
   
   We need to get the current `leaderAndIsr` from the 
`toUpdateLeaderAndIsrList` first, and then get from `coordinatorContext` if not 
found. 
   
   Please also add such a unit test in `ReplicaStateMachineTest` to cover this 
case. 
   



##########
fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java:
##########
@@ -211,14 +242,44 @@ void testBatchCreateLeaderAndIsr() throws Exception {
                     new RegisterTableBucketLeadAndIsrInfo(
                             tableBucket, leaderAndIsr, "partition" + i, null));
         }
-
+        // batch create
         
zookeeperClient.batchRegisterLeaderAndIsrForTablePartition(partitionTableBucket);
         for (int i = 0; i < 100; i++) {
+            // each should register successful
             Optional<LeaderAndIsr> optionalLeaderAndIsr =
                     
zookeeperClient.getLeaderAndIsr(partitionTableBucket.get(i).getTableBucket());
             assertThat(optionalLeaderAndIsr.isPresent()).isTrue();
             
assertThat(optionalLeaderAndIsr.get()).isIn(partitionleaderAndIsrList);
         }
+
+        Map<TableBucket, LeaderAndIsr> partitionUpdateMap =

Review Comment:
   This method is quite long. Please consider to move the partitioned table 
test to a new method as there is nothing reused with the non-partitioned test. 
If you can unify the test code using `@ParameterizedTest`, that would be great. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to