dajac commented on code in PR #18499:
URL: https://github.com/apache/kafka/pull/18499#discussion_r1920214280
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -2387,9 +2398,17 @@ public void scheduleUnloadOperation(
try {
if (partitionEpoch.isEmpty() || context.epoch <
partitionEpoch.getAsInt()) {
log.info("Started unloading metadata for {} with epoch
{}.", tp, partitionEpoch);
- context.transitionTo(CoordinatorState.CLOSED);
- coordinators.remove(tp, context);
- log.info("Finished unloading metadata for {} with
epoch {}.", tp, partitionEpoch);
+ try {
+ context.transitionTo(CoordinatorState.CLOSED);
Review Comment:
I wonder if they are other state transitions that we should look into. Have
you checked?
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -666,27 +668,25 @@ private void transitionTo(
break;
case ACTIVE:
- state = CoordinatorState.ACTIVE;
highWatermarklistener = new HighWatermarkListener();
partitionWriter.registerListener(tp,
highWatermarklistener);
coordinator.onLoaded(metadataImage);
break;
case FAILED:
- state = CoordinatorState.FAILED;
unload();
break;
case CLOSED:
- state = CoordinatorState.CLOSED;
unload();
break;
default:
+ // Revert the state update
+ state = oldState;
+ runtimeMetrics.recordPartitionStateChange(newState,
oldState);
Review Comment:
I think that we could change this switch to using the new switch expression
now. A switch expression requires to handle the cases and we handle all the
states we could remove the default.
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -1837,7 +1849,14 @@ public void onHighWatermarkUpdated(
// exists and is in the active state.
log.debug("Updating high watermark of {} to
{}.", tp, newHighWatermark);
context.coordinator.updateLastCommittedOffset(newHighWatermark);
-
context.deferredEventQueue.completeUpTo(newHighWatermark);
+ try {
+
context.deferredEventQueue.completeUpTo(newHighWatermark);
+ } catch (Throwable e) {
+ log.error("Failed to complete deferred
events for {} up to {}, flushing deferred event queue.",
+ tp, newHighWatermark, e);
+
context.deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+
context.failCurrentBatch(Errors.NOT_COORDINATOR.exception());
Review Comment:
I see. I think that we should not fail all events here. If we want to fail
the state machine, we should transition it to the FAILED state and then reload
it. However, it does not seem necessary here. Somehow, I feel like that
ensuring that events do not throw when complete is called is better as it will
ensure that each event is doing the right thing. We should also keep in mind
that we are protecting ourselves against weird bugs in the event completion.
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -840,7 +848,11 @@ private void failCurrentBatch(Throwable t) {
if (currentBatch != null) {
coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
for (DeferredEvent event : currentBatch.deferredEvents) {
- event.complete(t);
+ try {
+ event.complete(t);
+ } catch (Throwable e) {
+ log.error("Event {} for {} failed to complete.",
event, tp, e);
+ }
Review Comment:
Yeah, a wrapper could be a nice approach. Wanna give it a try?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]