soarez commented on code in PR #15945: URL: https://github.com/apache/kafka/pull/15945#discussion_r1606964394
########## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ########## @@ -257,11 +257,11 @@ class BrokerLifecycleManager( eventQueue.append(new OfflineDirEvent(directory)) } - def handleKraftJBODMetadataVersionUpdate(): Unit = { - eventQueue.append(new KraftJBODMetadataVersionUpdateEvent()) + def resendBrokerRegistrationUnlessZkMode(): Unit = { + eventQueue.append(new ResentBrokerRegistrationUnlessZkModeEvent()) Review Comment: `Resent` -> `Resend` ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -407,6 +407,13 @@ public ControllerResult<BrokerRegistrationReply> registerBroker( setBrokerEpoch(brokerEpoch). setRack(request.rack()). setEndPoints(listenerInfo.toBrokerRegistrationRecord()); + + if (existing != null && request.incarnationId().equals(existing.incarnationId())) { Review Comment: Should we also check `featureControl.metadataVersion().isDirectoryAssignmentSupported()` and `!request.logDirs().isEmpty()` or do we want to allow this in other conditions? ########## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { Review Comment: I'm wondering if there is a more general case here that would be useful to extract – listening for metadata changes to enable features. If storage has been formatted a long time ago, the MV at statup can be old and the broker will only find out about the current MV as it catches up with metadata. JBOD won't be the only feature that needs to be activated upon discovering a newer MV. Maybe there aren't many of these cases now, but it would better if all those features get activated via the same metadata listening mechanism. What do you think? ########## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { + private final Logger log; + private final int id; + private final Runnable refreshRegistrationCallback; + + /** + * Create the tracker. + * + * @param id The ID of this broker. + * @param targetDirectories The directories managed by this broker. + * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. + */ + public BrokerRegistrationTracker( + int id, + List<Uuid> targetDirectories, + Runnable refreshRegistrationCallback + ) { + this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). + logger(BrokerRegistrationTracker.class); + this.id = id; + this.refreshRegistrationCallback = refreshRegistrationCallback; + } + + @Override + public String name() { + return "BrokerRegistrationTracker(id=" + id + ")"; + } + + @Override + public void onMetadataUpdate( + MetadataDelta delta, + MetadataImage newImage, + LoaderManifest manifest + ) { + boolean checkBrokerRegistration = false; + if (delta.featuresDelta() != null) { + if (delta.metadataVersionChanged().isPresent()) { + if (log.isTraceEnabled()) { + log.trace("Metadata version change is present: {}", + delta.metadataVersionChanged()); + } + checkBrokerRegistration = true; + } + } + if (delta.clusterDelta() != null) { + if (delta.clusterDelta().changedBrokers().get(id) != null) { + if (log.isTraceEnabled()) { + log.trace("Broker change is present: {}", + delta.clusterDelta().changedBrokers().get(id)); + } + checkBrokerRegistration = true; + } + } + if (checkBrokerRegistration) { + if (brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(), + delta.clusterDelta().broker(id))) { + refreshRegistrationCallback.run(); + } + } + } + + /** + * Check if the current broker registration needs to be refreshed. + * + * @param registration The current broker registration, or null if there is none. + * @return True only if we should refresh. + */ + boolean brokerRegistrationNeedsRefresh( + MetadataVersion metadataVersion, + BrokerRegistration registration + ) { + // If there is no existing registration, the BrokerLifecycleManager must still be sending it. + // So we don't need to do anything yet. + if (registration == null) { + log.debug("No current broker registration to check."); + return false; + } + // Check to see if the directory list has changed. Note that this check could certainly be + // triggered spuriously. For example, if the broker's directory list has been changed in the + // past, and we are in the process of replaying that change log, we will end up here. + // That's fine because resending the broker registration does not cause any problems. And, + // of course, as soon as a snapshot is made, we will no longer need to worry about those + // old metadata log entries being replayed on startup. + if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2) && + registration.directories().isEmpty()) { Review Comment: If `metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)` evaluates to `false`, then isn't `registration.directories().isEmpty()` always `true`? The comment above mentions the directory list changing as a way to get here, but the only ways the directory list can change are: * On a broker registration that includes dirs * When a broker that has previously registered with more than one dir turns one of them offline Both of these require that the broker has registered at least once with directories, which can only happen after `MetadataVersion.IBP_3_7_IV2` ########## metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.publisher; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.List; + +/** + * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. + * + * This tracker handles cases where we might want to re-register the broker. The only such case + * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the + * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2. + * In this case, the broker registration will start out containing no directories, and we need to + * resend the BrokerRegistrationRequest to fix that. + * + * As much as possible, the goal here is to keep things simple. We just compare the desired state + * with the actual state, and try to make changes only if necessary. + */ +public class BrokerRegistrationTracker implements MetadataPublisher { + private final Logger log; + private final int id; + private final Runnable refreshRegistrationCallback; + + /** + * Create the tracker. + * + * @param id The ID of this broker. + * @param targetDirectories The directories managed by this broker. + * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. + */ + public BrokerRegistrationTracker( + int id, + List<Uuid> targetDirectories, + Runnable refreshRegistrationCallback + ) { + this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). + logger(BrokerRegistrationTracker.class); + this.id = id; + this.refreshRegistrationCallback = refreshRegistrationCallback; + } + + @Override + public String name() { + return "BrokerRegistrationTracker(id=" + id + ")"; + } + + @Override + public void onMetadataUpdate( + MetadataDelta delta, + MetadataImage newImage, + LoaderManifest manifest + ) { + boolean checkBrokerRegistration = false; + if (delta.featuresDelta() != null) { + if (delta.metadataVersionChanged().isPresent()) { + if (log.isTraceEnabled()) { + log.trace("Metadata version change is present: {}", + delta.metadataVersionChanged()); + } + checkBrokerRegistration = true; + } + } + if (delta.clusterDelta() != null) { + if (delta.clusterDelta().changedBrokers().get(id) != null) { + if (log.isTraceEnabled()) { + log.trace("Broker change is present: {}", + delta.clusterDelta().changedBrokers().get(id)); + } + checkBrokerRegistration = true; + } + } + if (checkBrokerRegistration) { + if (brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(), + delta.clusterDelta().broker(id))) { + refreshRegistrationCallback.run(); + } + } + } + + /** + * Check if the current broker registration needs to be refreshed. + * + * @param registration The current broker registration, or null if there is none. + * @return True only if we should refresh. + */ + boolean brokerRegistrationNeedsRefresh( + MetadataVersion metadataVersion, + BrokerRegistration registration + ) { + // If there is no existing registration, the BrokerLifecycleManager must still be sending it. + // So we don't need to do anything yet. + if (registration == null) { + log.debug("No current broker registration to check."); + return false; + } + // Check to see if the directory list has changed. Note that this check could certainly be + // triggered spuriously. For example, if the broker's directory list has been changed in the + // past, and we are in the process of replaying that change log, we will end up here. + // That's fine because resending the broker registration does not cause any problems. And, + // of course, as soon as a snapshot is made, we will no longer need to worry about those + // old metadata log entries being replayed on startup. + if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2) && + registration.directories().isEmpty()) { Review Comment: Should we also check the broker configuration? If the broker isn't configured with multiple log directories there's no need to re-register. Perhaps we shouldn't even setup this metadata listener if there's only one configured log dir. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org