This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new d1a9255d4cb KAFKA-19561: Set OP_WRITE interest after SASL
reauthentication to resume pending writes (#20258)
d1a9255d4cb is described below
commit d1a9255d4cba86541d0993c1464476fdf298a6c1
Author: Manikumar Reddy <[email protected]>
AuthorDate: Thu Jul 31 21:59:21 2025 +0530
KAFKA-19561: Set OP_WRITE interest after SASL reauthentication to resume
pending writes (#20258)
https://issues.apache.org/jira/browse/KAFKA-19561
Addresses a race condition during SASL reauthentication where the
server-side `KafkaChannel.send()` queues a response, but OP_WRITE is
removed before the channel becomes writable — resulting in stuck
responses and client timeouts.
Reviewers: Rajini Sivaram <[email protected]>
---
.../apache/kafka/common/network/KafkaChannel.java | 10 +++
.../org/apache/kafka/common/network/Selector.java | 1 +
.../authenticator/SaslAuthenticatorTest.java | 75 +++++++++++++++++++++-
3 files changed, 85 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index df6ccd67ce5..22c24f8408c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -681,4 +681,14 @@ public class KafkaChannel implements AutoCloseable {
public ChannelMetadataRegistry channelMetadataRegistry() {
return metadataRegistry;
}
+
+
+ /**
+ * Maybe add write interest after re-authentication. This is to ensure
that any pending write operation
+ * is resumed.
+ */
+ public void maybeAddWriteInterestAfterReauth() {
+ if (send != null)
+ this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 68698ab7b8d..b13f9acee01 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -566,6 +566,7 @@ public class Selector implements Selectable, AutoCloseable {
boolean isReauthentication =
channel.successfulAuthentications() > 1;
if (isReauthentication) {
sensors.successfulReauthentication.record(1.0,
readyTimeMs);
+ channel.maybeAddWriteInterestAfterReauth();
if (channel.reauthenticationLatencyMs() == null)
log.warn(
"Should never happen: re-authentication
latency for a re-authenticated channel was null; continuing...");
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 145e887a5be..ef8bb96d5df 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -44,6 +44,7 @@ import
org.apache.kafka.common.network.ChannelMetadataRegistry;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
@@ -119,6 +120,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -1856,6 +1858,69 @@ public class SaslAuthenticatorTest {
verifySslClientAuthForSaslSslListener(false, SslClientAuth.REQUIRED);
}
+ @Test
+ public void testServerSidePendingSendDuringReauthentication() throws
Exception {
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+ TestJaasConfig jaasConfig = configureMechanisms("PLAIN",
Collections.singletonList("PLAIN"));
+ jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER,
PlainLoginModule.class.getName(), new HashMap<>());
+ jaasConfig.setClientOptions("PLAIN",
TestServerCallbackHandler.USERNAME, TestServerCallbackHandler.PASSWORD);
+ String callbackPrefix =
ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix("PLAIN");
+ saslServerConfigs.put(callbackPrefix +
BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG,
+ TestServerCallbackHandler.class.getName());
+ server = createEchoServer(securityProtocol);
+
+ String node = "node1";
+ try {
+ createClientConnection(securityProtocol, node);
+ NetworkTestUtils.waitForChannelReady(selector, node);
+ server.verifyAuthenticationMetrics(1, 0);
+
+ /*
+ * Now start the reauthentication on the connection. First, we
have to sleep long enough so
+ * that the next write will cause re-authentication
+ */
+ delay((long) (CONNECTIONS_MAX_REAUTH_MS_VALUE * 1.1));
+ server.verifyReauthenticationMetrics(0, 0);
+
+ // block reauthentication to complete
+ TestServerCallbackHandler.sem.acquire();
+
+ String prefix = TestUtils.randomString(100);
+ // send a client request to start a reauthentication.
+ selector.send(new NetworkSend(node,
ByteBufferSend.sizePrefixed(ByteBuffer.wrap((prefix +
"-0").getBytes(StandardCharsets.UTF_8)))));
+ // wait till reauthentication is blocked
+ TestUtils.waitForCondition(() -> {
+ selector.poll(10L);
+ return TestServerCallbackHandler.sem.hasQueuedThreads();
+ }, 5000, "Reauthentication is not blocked");
+
+ // Set the client's channel `send` to null to allow setting a new
send on the server's selector.
+ // Without this, NioEchoServer will throw an error while
processing the client request,
+ // since we're manually setting a server side send to simulate the
issue.
+ TestUtils.setFieldValue(selector.channel(node), "send", null);
+
+ // extract the channel id from the server's selector and directly
set a send on it.
+ String channelId = server.selector().channels().get(0).id();
+ String payload = prefix + "-1";
+ server.selector().send(new NetworkSend(channelId,
ByteBufferSend.sizePrefixed(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)))));
+ // allow reauthentication to complete
+ TestServerCallbackHandler.sem.release();
+
+ TestUtils.waitForCondition(() -> {
+ selector.poll(10L);
+ for (NetworkReceive receive : selector.completedReceives()) {
+ assertEquals(payload, new
String(Utils.toArray(receive.payload()), StandardCharsets.UTF_8));
+ return true;
+ }
+ return false;
+ }, 5000, "Failed Receive the server send after reauthentication");
+
+ server.verifyReauthenticationMetrics(1, 0);
+ } finally {
+ closeClientConnectionIfNecessary();
+ }
+ }
+
private void verifySslClientAuthForSaslSslListener(boolean
useListenerPrefix,
SslClientAuth
configuredClientAuth) throws Exception {
@@ -2312,6 +2377,7 @@ public class SaslAuthenticatorTest {
static final String USERNAME = "TestServerCallbackHandler-user";
static final String PASSWORD = "TestServerCallbackHandler-password";
private volatile boolean configured;
+ public static Semaphore sem = new Semaphore(1);
@Override
public void configure(Map<String, ?> configs, String mechanism,
List<AppConfigurationEntry> jaasConfigEntries) {
@@ -2325,7 +2391,14 @@ public class SaslAuthenticatorTest {
protected boolean authenticate(String username, char[] password) {
if (!configured)
throw new IllegalStateException("Server callback handler not
configured");
- return USERNAME.equals(username) && new
String(password).equals(PASSWORD);
+ try {
+ sem.acquire();
+ return USERNAME.equals(username) && new
String(password).equals(PASSWORD);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ sem.release();
+ }
}
}