rajinisivaram commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1619159464


##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1123,6 +1133,17 @@ public long maybeUpdate(long now) {
             // Beware that the behavior of this method and the computation of 
timeouts for poll() are
             // highly dependent on the behavior of leastLoadedNode.
             Node node = leastLoadedNode(now);
+
+            // Rebootstrap if needed and configured.
+            if (node == null && metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {

Review Comment:
   Couldn't `node == null` mean that all the connections have max.in.flight 
requests and hence we need to wait?



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -135,6 +136,7 @@ object KafkaConfig {
       .define(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, INT, 
ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT, HIGH, 
ServerConfigs.REQUEST_TIMEOUT_MS_DOC)
       .define(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, LONG, 
ServerConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, MEDIUM, 
ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
       .define(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 
LONG, ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, MEDIUM, 
ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
+      .define(ServerConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, STRING, 
ServerConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY, LOW, 
ServerConfigs.METADATA_RECOVERY_STRATEGY_DOC)

Review Comment:
   Which clients in the broker do we want to rebootstrap? Don't remember this 
change from the KIP.



##########
core/src/main/scala/kafka/raft/RaftManager.scala:
##########
@@ -307,7 +307,8 @@ class KafkaRaftManager[T](
       time,
       discoverBrokerVersions,
       apiVersions,
-      logContext
+      logContext,
+      MetadataRecoveryStrategy.NONE

Review Comment:
   Rather than change all these callers, couldn't we retain the original 
constructor for these?



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -304,6 +306,12 @@ public synchronized void bootstrap(List<InetSocketAddress> 
addresses) {
         this.needFullUpdate = true;
         this.updateVersion += 1;
         this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses);
+        this.bootstrapAddresses = addresses;

Review Comment:
   Is there a reason we are re-bootstrapping from addresses that were resolved 
when the client was created? This seems different from the example in the KIP 
where the addresses that the bootstrap resolved to had changed.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java:
##########
@@ -287,4 +293,12 @@ public void update(Cluster cluster, long now) {
             this.cluster = cluster;
         }
     }
+
+    /**
+     * Rebootstrap metadata with the cluster previously used for bootstrapping.
+     */
+    public void rebootstrap(long now) {
+        log.info("Rebootstrapping with {}", this.bootstrapCluster);
+        update(bootstrapCluster, now);

Review Comment:
   Same question as with NetworkClient, don't we want to re-resolve bootstrap?



##########
clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Defines the strategies which clients can follow to deal with the situation 
when none of the known nodes is available.
+ */
+public enum MetadataRecoveryStrategy {
+    NONE("none"),
+    REBOOTSTRAP("rebootstrap");
+
+    public final String name;
+
+    MetadataRecoveryStrategy(String name) {
+        this.name = name;
+    }
+
+    private static final List<MetadataRecoveryStrategy> VALUES = 
asList(values());
+
+    public static MetadataRecoveryStrategy forName(String n) {
+        String name = n.toLowerCase(Locale.ROOT);
+        for (MetadataRecoveryStrategy t : values()) {
+            if (t.name.equals(name)) {
+                return t;
+            }
+        }
+        throw new NoSuchElementException("Invalid metadata recovery strategy " 
+ name);

Review Comment:
   In other enums like ClientDnsLookup, we throw IllegalArgumentException, we 
could do the same here.



##########
core/src/test/scala/integration/kafka/api/RebootstrapTest.scala:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+
+import java.util.Properties
+
+abstract class RebootstrapTest extends AbstractConsumerTest {
+  override def brokerCount: Int = 2
+
+  def server0: KafkaServer = serverForId(0).get
+  def server1: KafkaServer = serverForId(1).get
+
+  override def generateConfigs: Seq[KafkaConfig] = {
+    val overridingProps = new Properties()
+    
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 brokerCount.toString)
+    overridingProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
"true")
+
+    // In this test, fixed ports are necessary, because brokers must have the
+    // same port after the restart.
+    FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, 
enableControlledShutdown = false)

Review Comment:
   We have moved away from using fixed ports in tests since they can result in 
spurious test failures. But not sure what the alternative would be for these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to