This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new de73bea  KAFKA-13461: Don't re-initialize ZK client session after auth 
failure if connection still alive (#11563)
de73bea is described below

commit de73bea58ee6afb466eef4b375f8533aca97494f
Author: Rajini Sivaram <[email protected]>
AuthorDate: Thu Dec 2 22:10:37 2021 +0000

    KAFKA-13461: Don't re-initialize ZK client session after auth failure if 
connection still alive (#11563)
    
    If JAAS configuration does not contain a Client section for ZK clients, an 
auth failure event is generated. If this occurs after the connection is setup 
in the controller, we schedule reinitialize(), which causes controller to 
resign. In the case where SASL is not mandatory and the connection is alive, 
controller maintains the current session and doesn't register its watchers, 
leaving it in a bad state.
    
    Reviewers: Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala     |  4 ++--
 .../test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 11 ++++++++++-
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 091b401..bc634a8 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -433,14 +433,14 @@ class ZooKeeperClient(connectString: String,
             isConnectedOrExpiredCondition.signalAll()
           }
           if (state == KeeperState.AuthFailed) {
-            error("Auth failed.")
+            error(s"Auth failed, initialized=$isFirstConnectionEstablished 
connectionState=$connectionState")
             stateChangeHandlers.values.foreach(_.onAuthFailure())
 
             // If this is during initial startup, we fail fast. Otherwise, 
schedule retry.
             val initialized = inLock(isConnectedOrExpiredLock) {
               isFirstConnectionEstablished
             }
-            if (initialized)
+            if (initialized && !connectionState.isAlive)
               scheduleReinitialize("auth-failed", "Reinitializing due to auth 
failure.", RetryBackoffMs)
           } else if (state == KeeperState.Expired) {
             scheduleReinitialize("session-expired", "Session expired.", 
delayMs = 0L)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 649f3c5..5af2ba8 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -649,9 +649,18 @@ class ZooKeeperClientTest extends QuorumTestHarness {
     }
 
     zooKeeperClient.close()
-    zooKeeperClient = newZooKeeperClient()
+    @volatile var connectionStateOverride: Option[States] = None
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout,
+      zkMaxInFlightRequests, time, "testMetricGroup", "testMetricType", new 
ZKClientConfig, "ZooKeeperClientTest") {
+      override def connectionState: States = 
connectionStateOverride.getOrElse(super.connectionState)
+    }
     zooKeeperClient.registerStateChangeHandler(changeHandler)
 
+    connectionStateOverride = Some(States.CONNECTED)
+    zooKeeperClient.ZooKeeperClientWatcher.process(new 
WatchedEvent(EventType.None, KeeperState.AuthFailed, null))
+    assertFalse(sessionInitializedCountDownLatch.await(10, 
TimeUnit.MILLISECONDS), "Unexpected session initialization when connection is 
alive")
+
+    connectionStateOverride = Some(States.AUTH_FAILED)
     zooKeeperClient.ZooKeeperClientWatcher.process(new 
WatchedEvent(EventType.None, KeeperState.AuthFailed, null))
     assertTrue(sessionInitializedCountDownLatch.await(5, TimeUnit.SECONDS), 
"Failed to receive session initializing notification")
   }

Reply via email to