Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-12 Thread via GitHub


rajinisivaram merged PR #13277:
URL: https://github.com/apache/kafka/pull/13277


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-12 Thread via GitHub


rajinisivaram commented on PR #13277:
URL: https://github.com/apache/kafka/pull/13277#issuecomment-2163778250

   @ivanyu Thanks for the updates, test failures not related, merging to trunk 
and 3.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-12 Thread via GitHub


ivanyu commented on PR #13277:
URL: https://github.com/apache/kafka/pull/13277#issuecomment-2163515422

   Some tests are failing on one pipeline, but they seem unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-12 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1636356455


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1121,13 +1140,25 @@ public long maybeUpdate(long now) {
 
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
-Node node = leastLoadedNode(now);
-if (node == null) {
+LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (!leastLoadedNode.hasNodeAvailableOrConnectionReady()
+&& metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {
+for (final Node oldNode : metadata.fetch().nodes()) {
+NetworkClient.this.close(oldNode.idString());

Review Comment:
   Oh that's right: if `!leastLoadedNode.hasNodeAvailableOrConnectionReady()`, 
there shouldn't be open connections. I removed this closing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-12 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1636337120


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1121,13 +1140,25 @@ public long maybeUpdate(long now) {
 
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
-Node node = leastLoadedNode(now);
-if (node == null) {
+LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (!leastLoadedNode.hasNodeAvailableOrConnectionReady()
+&& metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {

Review Comment:
   As it's only comparisons inside the method, performance-wise it doesn't 
matter much. However, it's more logical. Done this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-12 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1636332264


##
clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+
+public class LeastLoadedNode {
+private final Node node;
+private final boolean atLeastOneConnectionReady;
+
+public LeastLoadedNode(Node node, boolean atLeastOneConnectionReady) {
+this.node = node;
+this.atLeastOneConnectionReady = atLeastOneConnectionReady;
+}
+
+public Node node() {
+return node;
+}
+
+/**
+ * Indicates if the least loaded node is available or at least a ready 
connection exists.
+ *
+ * There may be no node available while ready connections to live nodes 
exist. This may happen when
+ * the connections are overloaded with in-flight requests. This function 
takes this into account.
+ */
+public boolean hasNodeAvailableOrConnectionReady() {
+return node() != null || atLeastOneConnectionReady;

Review Comment:
   Yeah, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-12 Thread via GitHub


rajinisivaram commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1635927735


##
clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+
+public class LeastLoadedNode {
+private final Node node;
+private final boolean atLeastOneConnectionReady;
+
+public LeastLoadedNode(Node node, boolean atLeastOneConnectionReady) {
+this.node = node;
+this.atLeastOneConnectionReady = atLeastOneConnectionReady;
+}
+
+public Node node() {
+return node;
+}
+
+/**
+ * Indicates if the least loaded node is available or at least a ready 
connection exists.
+ *
+ * There may be no node available while ready connections to live nodes 
exist. This may happen when
+ * the connections are overloaded with in-flight requests. This function 
takes this into account.
+ */
+public boolean hasNodeAvailableOrConnectionReady() {
+return node() != null || atLeastOneConnectionReady;

Review Comment:
   This can just use `node` field.



##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1121,13 +1140,25 @@ public long maybeUpdate(long now) {
 
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
-Node node = leastLoadedNode(now);
-if (node == null) {
+LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (!leastLoadedNode.hasNodeAvailableOrConnectionReady()
+&& metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {
+for (final Node oldNode : metadata.fetch().nodes()) {
+NetworkClient.this.close(oldNode.idString());

Review Comment:
   Is there a chance that there may be connections to old nodes? I think we 
want to rebootstrap only if every node is in backoff state. We don't close 
nodes in the Admin client equivalent below, which is confusing, so it may be 
better to remove this unless it is required. Checking 
`hasNodeAvailableOrConnectionReady` adds to the confusion since it suggests 
there may be nodes in some non-ready state that need to be closed. Should we 
change `leastLoadedNode()` method to return `hasConnectedOrConnecting` nodes 
instead of `atLeastOneConnectionReady` to make this more obvious?



##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1121,13 +1140,25 @@ public long maybeUpdate(long now) {
 
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
-Node node = leastLoadedNode(now);
-if (node == null) {
+LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (!leastLoadedNode.hasNodeAvailableOrConnectionReady()
+&& metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {

Review Comment:
   Should we swap this around to `if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP && 
!leastLoadedNode.hasNodeAvailableOrConnectionReady())` here and in the admin 
client, so that we only use the new method if rebootstrap is configured?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-12 Thread via GitHub


rajinisivaram commented on PR #13277:
URL: https://github.com/apache/kafka/pull/13277#issuecomment-2162268137

   @ivanyu There are more test failures that need fixing before this PR can be 
merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-11 Thread via GitHub


ivanyu commented on PR #13277:
URL: https://github.com/apache/kafka/pull/13277#issuecomment-2161302594

   Comments addressed and conflicts resolved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-11 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1635256179


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -705,16 +715,25 @@ public Node leastLoadedNode(long now) {
 Node foundCanConnect = null;
 Node foundReady = null;
 
+boolean atLeastOneNodeConnected = false;
+
 int offset = this.randOffset.nextInt(nodes.size());
 for (int i = 0; i < nodes.size(); i++) {
 int idx = (offset + i) % nodes.size();
 Node node = nodes.get(idx);
+
+if (!atLeastOneNodeConnected
+&& connectionStates.isReady(node.idString(), now)
+&& selector.isChannelReady(node.idString())) {
+atLeastOneNodeConnected = true;

Review Comment:
   Right, I added some unit testing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-11 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1635231090


##
clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+
+public class LeastLoadedNode {
+private final Node node;
+private final boolean atLeastOneConnected;
+
+public LeastLoadedNode(Node node, boolean atLeastOneConnected) {
+this.node = node;
+this.atLeastOneConnected = atLeastOneConnected;
+}
+
+public Node node() {
+return node;
+}
+
+public boolean isAtLeastOneConnected() {

Review Comment:
   Done this, but 
https://github.com/apache/kafka/pull/13277#discussion_r1633579132 deprecates 
this anyway



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-11 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1635230739


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1122,13 +1141,26 @@ public long maybeUpdate(long now) {
 
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
-Node node = leastLoadedNode(now);
-if (node == null) {
+LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (leastLoadedNode.node() == null
+&& !leastLoadedNode.isAtLeastOneConnected()

Review Comment:
   Great suggestion, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-11 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1635197250


##
server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java:
##
@@ -75,6 +75,10 @@ public class ServerConfigs {
 public static final long SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS = 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS;
 public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC;
 
+public static final String METADATA_RECOVERY_STRATEGY_CONFIG = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
+public static final String METADATA_RECOVERY_STRATEGY_DOC = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
+public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = 
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;

Review Comment:
   Right, removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-10 Thread via GitHub


rajinisivaram commented on PR #13277:
URL: https://github.com/apache/kafka/pull/13277#issuecomment-2158922004

   @ivanyu Also need to fix unit tests that are failing because of the change 
from Node to LeastLoadedNode and resolve merge conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-10 Thread via GitHub


rajinisivaram commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1633579132


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1122,13 +1141,26 @@ public long maybeUpdate(long now) {
 
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
-Node node = leastLoadedNode(now);
-if (node == null) {
+LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (leastLoadedNode.node() == null
+&& !leastLoadedNode.isAtLeastOneConnected()

Review Comment:
   Could we add a method to `LeastLoadedNode` that checks 
`leastLoadedNode.node() == null && !leastLoadedNode.isAtLeastOneConnected()` 
since we use that here and in Admin client? It will make it more obvious in 
LeastLoadedNode that the boolean is only used when node is null. We could then 
remove `isAtLeastOneConnected` method perhaps.



##
server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java:
##
@@ -75,6 +75,10 @@ public class ServerConfigs {
 public static final long SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS = 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS;
 public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC;
 
+public static final String METADATA_RECOVERY_STRATEGY_CONFIG = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
+public static final String METADATA_RECOVERY_STRATEGY_DOC = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
+public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = 
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;

Review Comment:
   These are no longer required?



##
clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+
+public class LeastLoadedNode {
+private final Node node;
+private final boolean atLeastOneConnected;
+
+public LeastLoadedNode(Node node, boolean atLeastOneConnected) {
+this.node = node;
+this.atLeastOneConnected = atLeastOneConnected;
+}
+
+public Node node() {
+return node;
+}
+
+public boolean isAtLeastOneConnected() {

Review Comment:
   We are setting this to true only for ready connections, should we rename the 
method to indicate that?



##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -705,16 +715,25 @@ public Node leastLoadedNode(long now) {
 Node foundCanConnect = null;
 Node foundReady = null;
 
+boolean atLeastOneNodeConnected = false;
+
 int offset = this.randOffset.nextInt(nodes.size());
 for (int i = 0; i < nodes.size(); i++) {
 int idx = (offset + i) % nodes.size();
 Node node = nodes.get(idx);
+
+if (!atLeastOneNodeConnected
+&& connectionStates.isReady(node.idString(), now)
+&& selector.isChannelReady(node.idString())) {
+atLeastOneNodeConnected = true;

Review Comment:
   Can we add one unit test for the various combinations to verify the returned 
values for LeastLoadedNode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-02 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1623777938


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1123,6 +1133,17 @@ public long maybeUpdate(long now) {
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
 Node node = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (node == null && metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {

Review Comment:
   I have an idea how to avoid rebootstrapping in this case. The idea is to 
pass some information along with the `Node` out of `leastLoadedNode`. See this 
commit 
https://github.com/apache/kafka/pull/13277/commits/3a303b7de650a6d6a94d5652a476ca9a98610380.
 Does it make sense? It changes the interface a little, but the overall change 
turned out to be smaller than I expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-31 Thread via GitHub


rajinisivaram commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1622186108


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -304,6 +306,12 @@ public synchronized void bootstrap(List 
addresses) {
 this.needFullUpdate = true;
 this.updateVersion += 1;
 this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses);
+this.bootstrapAddresses = addresses;

Review Comment:
   Yes, this seems fine because we are only using host string from the 
InetSocketAddress to build the bootstrap cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-30 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1620696831


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1123,6 +1133,17 @@ public long maybeUpdate(long now) {
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
 Node node = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (node == null && metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {

Review Comment:
   Ugh, that's correct. I'm sorry, I missed this case...
   
   I need to think how to pass this information to avoid rebootstrapping. 
Potentially introduce a variant of `leastLoadedNode` method, which will be 
called from two places where rebootstrapping is expected. It's result will have 
a flag indicating if any node is connected. The rest of the code base will use 
the current version.
   
   The necessity to implement this for the admin client, which is different 
from other clients, makes it a bit ugly :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-30 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1620593527


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java:
##
@@ -287,4 +293,12 @@ public void update(Cluster cluster, long now) {
 this.cluster = cluster;
 }
 }
+
+/**
+ * Rebootstrap metadata with the cluster previously used for bootstrapping.
+ */
+public void rebootstrap(long now) {
+log.info("Rebootstrapping with {}", this.bootstrapCluster);
+update(bootstrapCluster, now);

Review Comment:
   It uses the same `ClientUtils.parseAndValidateAddresses` function inside, so 
the same logic applies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-30 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1620581608


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -135,6 +136,7 @@ object KafkaConfig {
   .define(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, INT, 
ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT, HIGH, 
ServerConfigs.REQUEST_TIMEOUT_MS_DOC)
   .define(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, LONG, 
ServerConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, MEDIUM, 
ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
   .define(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 
LONG, ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, MEDIUM, 
ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
+  .define(ServerConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, STRING, 
ServerConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY, LOW, 
ServerConfigs.METADATA_RECOVERY_STRATEGY_DOC)

Review Comment:
   You're right, it's not needed here. Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-30 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1620553331


##
clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Defines the strategies which clients can follow to deal with the situation 
when none of the known nodes is available.
+ */
+public enum MetadataRecoveryStrategy {
+NONE("none"),
+REBOOTSTRAP("rebootstrap");
+
+public final String name;
+
+MetadataRecoveryStrategy(String name) {
+this.name = name;
+}
+
+private static final List VALUES = 
asList(values());
+
+public static MetadataRecoveryStrategy forName(String n) {
+String name = n.toLowerCase(Locale.ROOT);

Review Comment:
   Yep, it handle this now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-30 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1620549218


##
clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Defines the strategies which clients can follow to deal with the situation 
when none of the known nodes is available.
+ */
+public enum MetadataRecoveryStrategy {
+NONE("none"),
+REBOOTSTRAP("rebootstrap");
+
+public final String name;
+
+MetadataRecoveryStrategy(String name) {
+this.name = name;
+}
+
+private static final List VALUES = 
asList(values());

Review Comment:
   Yes, removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-30 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1620547544


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -307,7 +307,8 @@ class KafkaRaftManager[T](
   time,
   discoverBrokerVersions,
   apiVersions,
-  logContext
+  logContext,
+  MetadataRecoveryStrategy.NONE

Review Comment:
   I originally did this. It basically doubled the number of constructors and 
looked hairy. After [this 
discussion](https://github.com/apache/kafka/pull/13277#discussion_r1607019116) 
reverted to the current approach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-30 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1620545307


##
clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Defines the strategies which clients can follow to deal with the situation 
when none of the known nodes is available.
+ */
+public enum MetadataRecoveryStrategy {
+NONE("none"),
+REBOOTSTRAP("rebootstrap");
+
+public final String name;
+
+MetadataRecoveryStrategy(String name) {
+this.name = name;
+}
+
+private static final List VALUES = 
asList(values());
+
+public static MetadataRecoveryStrategy forName(String n) {
+String name = n.toLowerCase(Locale.ROOT);
+for (MetadataRecoveryStrategy t : values()) {
+if (t.name.equals(name)) {
+return t;
+}
+}
+throw new NoSuchElementException("Invalid metadata recovery strategy " 
+ name);

Review Comment:
   Some others like `TimestampType` throw `NoSuchElementException`, but indeed 
`IllegalArgumentException` is probably better. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-30 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1620547544


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -307,7 +307,8 @@ class KafkaRaftManager[T](
   time,
   discoverBrokerVersions,
   apiVersions,
-  logContext
+  logContext,
+  MetadataRecoveryStrategy.NONE

Review Comment:
   I originally did this, but after [this 
discussion](https://github.com/apache/kafka/pull/13277#discussion_r1607019116) 
reverted to the current approach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-30 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1620514653


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -304,6 +306,12 @@ public synchronized void bootstrap(List 
addresses) {
 this.needFullUpdate = true;
 this.updateVersion += 1;
 this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses);
+this.bootstrapAddresses = addresses;

Review Comment:
   This really depends on the `client.dns.lookup`. E.g. if you use the 
bootstrap address `kafka.myorg.com:12345`, the bootstrap node will get 
`kafka.myorg.com` as the host, despite the name resolution happens before that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-29 Thread via GitHub


gharris1727 commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1619177795


##
clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Defines the strategies which clients can follow to deal with the situation 
when none of the known nodes is available.
+ */
+public enum MetadataRecoveryStrategy {
+NONE("none"),
+REBOOTSTRAP("rebootstrap");
+
+public final String name;
+
+MetadataRecoveryStrategy(String name) {
+this.name = name;
+}
+
+private static final List VALUES = 
asList(values());

Review Comment:
   nit: this is unused



##
clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Defines the strategies which clients can follow to deal with the situation 
when none of the known nodes is available.
+ */
+public enum MetadataRecoveryStrategy {
+NONE("none"),
+REBOOTSTRAP("rebootstrap");
+
+public final String name;
+
+MetadataRecoveryStrategy(String name) {
+this.name = name;
+}
+
+private static final List VALUES = 
asList(values());
+
+public static MetadataRecoveryStrategy forName(String n) {
+String name = n.toLowerCase(Locale.ROOT);

Review Comment:
   nit: this can NPE if the input is null. This currently can't happen, but it 
would be nice to get the same exception type for all invalid inputs.



##
core/src/test/scala/integration/kafka/api/RebootstrapTest.scala:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+
+import java.util.Properties
+
+abstract class RebootstrapTest extends AbstractConsumerTest {
+  override def brokerCount: Int = 2
+
+  def server0: KafkaServer = serverForId(0).get
+  def server1: KafkaServer = serverForId(1).get
+
+  override def generateConfigs: Seq[KafkaConfig] = {
+val overridingProps = new Properties()
+

Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-29 Thread via GitHub


rajinisivaram commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1619159464


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1123,6 +1133,17 @@ public long maybeUpdate(long now) {
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
 Node node = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (node == null && metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {

Review Comment:
   Couldn't `node == null` mean that all the connections have max.in.flight 
requests and hence we need to wait?



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -135,6 +136,7 @@ object KafkaConfig {
   .define(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, INT, 
ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT, HIGH, 
ServerConfigs.REQUEST_TIMEOUT_MS_DOC)
   .define(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, LONG, 
ServerConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, MEDIUM, 
ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
   .define(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 
LONG, ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, MEDIUM, 
ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
+  .define(ServerConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, STRING, 
ServerConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY, LOW, 
ServerConfigs.METADATA_RECOVERY_STRATEGY_DOC)

Review Comment:
   Which clients in the broker do we want to rebootstrap? Don't remember this 
change from the KIP.



##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -307,7 +307,8 @@ class KafkaRaftManager[T](
   time,
   discoverBrokerVersions,
   apiVersions,
-  logContext
+  logContext,
+  MetadataRecoveryStrategy.NONE

Review Comment:
   Rather than change all these callers, couldn't we retain the original 
constructor for these?



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -304,6 +306,12 @@ public synchronized void bootstrap(List 
addresses) {
 this.needFullUpdate = true;
 this.updateVersion += 1;
 this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses);
+this.bootstrapAddresses = addresses;

Review Comment:
   Is there a reason we are re-bootstrapping from addresses that were resolved 
when the client was created? This seems different from the example in the KIP 
where the addresses that the bootstrap resolved to had changed.



##
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java:
##
@@ -287,4 +293,12 @@ public void update(Cluster cluster, long now) {
 this.cluster = cluster;
 }
 }
+
+/**
+ * Rebootstrap metadata with the cluster previously used for bootstrapping.
+ */
+public void rebootstrap(long now) {
+log.info("Rebootstrapping with {}", this.bootstrapCluster);
+update(bootstrapCluster, now);

Review Comment:
   Same question as with NetworkClient, don't we want to re-resolve bootstrap?



##
clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Defines the strategies which clients can follow to deal with the situation 
when none of the known nodes is available.
+ */
+public enum MetadataRecoveryStrategy {
+NONE("none"),
+REBOOTSTRAP("rebootstrap");
+
+public final String name;
+
+MetadataRecoveryStrategy(String name) {
+this.name = name;
+}
+
+private static final List VALUES = 
asList(values());
+
+public static MetadataRecoveryStrategy forName(String n) {
+String name = n.toLowerCase(Locale.ROOT);
+for (MetadataRecoveryStrategy t : values()) {
+if 

Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-29 Thread via GitHub


ivanyu commented on PR #13277:
URL: https://github.com/apache/kafka/pull/13277#issuecomment-2137695965

   I see there are some failures in the rebootstrap tests on CI. I need to deal 
with them


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-29 Thread via GitHub


ivanyu commented on PR #13277:
URL: https://github.com/apache/kafka/pull/13277#issuecomment-2136891093

   > I noticed there isn't an AdminClientReboostrapTest, is that intentional? I 
wouldn't personally require one because the change is in NetworkClient and not 
the admin client specifically, but just wanted to check with you.
   
   Yes, sorry. It was moved in/out of scope a couple of times during 
discussion, so I forgot to add the implementation and test for it.
   
   Also, the conflict resolved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-28 Thread via GitHub


ivanyu commented on PR #13277:
URL: https://github.com/apache/kafka/pull/13277#issuecomment-2135867575

   Yep, doing this + finishing with the admin client


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-28 Thread via GitHub


gharris1727 commented on PR #13277:
URL: https://github.com/apache/kafka/pull/13277#issuecomment-2135860640

   Hi @ivanyu Could you resolve the merge conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-27 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1616295494


##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -219,6 +219,19 @@ public class CommonClientConfigs {
 public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the 
timeout (in milliseconds) for client APIs. " +
 "This configuration is used as the default timeout for all client 
operations that do not specify a timeout parameter.";
 
+public static final String METADATA_RECOVERY_STRATEGY_CONFIG = 
"metadata.recovery.strategy";
+public static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how 
the client recovers when none of the brokers known to it is available. " +
+"If set to none, the client fails. If set to 
rebootstrap, " +
+"the client repeats the bootstrap process using 
bootstrap.servers. " +
+"Rebootstrapping is useful when a client communicates with brokers 
so infrequently " +
+"that the set of brokers may change entirely before the client 
refreshes metadata. " +
+"Opportunities to rebootstrapping depend on connection 
establishing and reconnect timeouts and the broker count. " +
+"The timeouts may prevent identifying brokers as unavailable 
simultaneously, which is necessary to trigger rebootstrapping. " +

Review Comment:
   This makes sense. Thank you, updated the doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-27 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1616292083


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##


Review Comment:
   Thank you. I removed extra constructors in this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-22 Thread via GitHub


gharris1727 commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1610617003


##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -219,6 +219,19 @@ public class CommonClientConfigs {
 public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the 
timeout (in milliseconds) for client APIs. " +
 "This configuration is used as the default timeout for all client 
operations that do not specify a timeout parameter.";
 
+public static final String METADATA_RECOVERY_STRATEGY_CONFIG = 
"metadata.recovery.strategy";
+public static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how 
the client recovers when none of the brokers known to it is available. " +
+"If set to none, the client fails. If set to 
rebootstrap, " +
+"the client repeats the bootstrap process using 
bootstrap.servers. " +
+"Rebootstrapping is useful when a client communicates with brokers 
so infrequently " +
+"that the set of brokers may change entirely before the client 
refreshes metadata. " +
+"Opportunities to rebootstrapping depend on connection 
establishing and reconnect timeouts and the broker count. " +
+"The timeouts may prevent identifying brokers as unavailable 
simultaneously, which is necessary to trigger rebootstrapping. " +

Review Comment:
   This section is very confusing for me, and I think this should be reworded.
   
   Maybe it can flow like this: "Metadata recovery is triggered when all 
last-known brokers appear unavailable simultaneously. Brokers appear 
unavailable when disconnected and no current retry attempt is in-progress."



##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##


Review Comment:
   Stuff in this package is not included in the public API: 
https://kafka.apache.org/37/javadoc/index.html only the stuff in 
admin/producer/consumer (and not in sub-packages) is truly public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-20 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1607019116


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##


Review Comment:
   I added a number of constructors here instead of altering the existing 
because they're public and I assume a part of the client library contract.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org