dajac commented on code in PR #18499:
URL: https://github.com/apache/kafka/pull/18499#discussion_r1920214280


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -2387,9 +2398,17 @@ public void scheduleUnloadOperation(
                 try {
                     if (partitionEpoch.isEmpty() || context.epoch < 
partitionEpoch.getAsInt()) {
                         log.info("Started unloading metadata for {} with epoch 
{}.", tp, partitionEpoch);
-                        context.transitionTo(CoordinatorState.CLOSED);
-                        coordinators.remove(tp, context);
-                        log.info("Finished unloading metadata for {} with 
epoch {}.", tp, partitionEpoch);
+                        try {
+                            context.transitionTo(CoordinatorState.CLOSED);

Review Comment:
   I wonder if they are other state transitions that we should look into. Have 
you checked?



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -666,27 +668,25 @@ private void transitionTo(
                     break;
 
                 case ACTIVE:
-                    state = CoordinatorState.ACTIVE;
                     highWatermarklistener = new HighWatermarkListener();
                     partitionWriter.registerListener(tp, 
highWatermarklistener);
                     coordinator.onLoaded(metadataImage);
                     break;
 
                 case FAILED:
-                    state = CoordinatorState.FAILED;
                     unload();
                     break;
 
                 case CLOSED:
-                    state = CoordinatorState.CLOSED;
                     unload();
                     break;
 
                 default:
+                    // Revert the state update
+                    state = oldState;
+                    runtimeMetrics.recordPartitionStateChange(newState, 
oldState);

Review Comment:
   I think that we could change this switch to using the new switch expression 
now. A switch expression requires to handle the cases and we handle all the 
states we could remove the default. 



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -1837,7 +1849,14 @@ public void onHighWatermarkUpdated(
                                 // exists and is in the active state.
                                 log.debug("Updating high watermark of {} to 
{}.", tp, newHighWatermark);
                                 
context.coordinator.updateLastCommittedOffset(newHighWatermark);
-                                
context.deferredEventQueue.completeUpTo(newHighWatermark);
+                                try {
+                                    
context.deferredEventQueue.completeUpTo(newHighWatermark);
+                                } catch (Throwable e) {
+                                    log.error("Failed to complete deferred 
events for {} up to {}, flushing deferred event queue.",
+                                        tp, newHighWatermark, e);
+                                    
context.deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+                                    
context.failCurrentBatch(Errors.NOT_COORDINATOR.exception());

Review Comment:
   I see. I think that we should not fail all events here. If we want to fail 
the state machine, we should transition it to the FAILED state and then reload 
it. However, it does not seem necessary here. Somehow, I feel like that 
ensuring that events do not throw when complete is called is better as it will 
ensure that each event is doing the right thing. We should also keep in mind 
that we are protecting ourselves against weird bugs in the event completion.



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -840,7 +848,11 @@ private void failCurrentBatch(Throwable t) {
             if (currentBatch != null) {
                 coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
                 for (DeferredEvent event : currentBatch.deferredEvents) {
-                    event.complete(t);
+                    try {
+                        event.complete(t);
+                    } catch (Throwable e) {
+                        log.error("Event {} for {} failed to complete.", 
event, tp, e);
+                    }

Review Comment:
   Yeah, a wrapper could be a nice approach. Wanna give it a try?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to