[ 
https://issues.apache.org/jira/browse/KAFKA-6772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442175#comment-16442175
 ] 

ASF GitHub Bot commented on KAFKA-6772:
---------------------------------------

rajinisivaram closed pull request #4867: KAFKA-6772: Load credentials from ZK 
before accepting connections
URL: https://github.com/apache/kafka/pull/4867
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index e4fdb089cd1..c0bc5939c08 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -76,12 +76,24 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   private var stoppedProcessingRequests = false
 
   /**
-   * Start the socket server
+   * Start the socket server. Acceptors for all the listeners are started. 
Processors
+   * are started if `startupProcessors` is true. If not, processors are only 
started when
+   * [[kafka.network.SocketServer#startProcessors()]] is invoked. Delayed 
starting of processors
+   * is used to delay processing client connections until server is fully 
initialized, e.g.
+   * to ensure that all credentials have been loaded before authentications 
are performed.
+   * Acceptors are always started during `startup` so that the bound port is 
known when this
+   * method completes even when ephemeral ports are used. Incoming connections 
on this server
+   * are processed when processors start up and invoke 
[[org.apache.kafka.common.network.Selector#poll]].
+   *
+   * @param startupProcessors Flag indicating whether `Processor`s must be 
started.
    */
-  def startup() {
+  def startup(startupProcessors: Boolean = true) {
     this.synchronized {
       connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, 
maxConnectionsPerIpOverrides)
       createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
+      if (startupProcessors) {
+        startProcessors()
+      }
     }
 
     newGauge("NetworkProcessorAvgIdlePercent",
@@ -110,6 +122,16 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
     info("Started " + acceptors.size + " acceptor threads")
   }
 
+  /**
+   * Starts processors of all the acceptors of this server if they have not 
already been started.
+   * This method is used for delayed starting of processors if 
[[kafka.network.SocketServer#startup]]
+   * was invoked with `startupProcessors=false`.
+   */
+  def startProcessors(): Unit = synchronized {
+    acceptors.values.asScala.foreach { _.startProcessors() }
+    info(s"Started processors for ${acceptors.size} acceptors")
+  }
+
   private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
 
   private def createAcceptorAndProcessors(processorsPerListener: Int,
@@ -196,6 +218,7 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
     info(s"Adding listeners for endpoints $listenersAdded")
     createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded)
+    startProcessors()
   }
 
   def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
@@ -307,13 +330,25 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
+  private val processorsStarted = new AtomicBoolean
 
   private[network] def addProcessors(newProcessors: Buffer[Processor]): Unit = 
synchronized {
-    newProcessors.foreach { processor =>
+    processors ++= newProcessors
+    if (processorsStarted.get)
+      startProcessors(newProcessors)
+  }
+
+  private[network] def startProcessors(): Unit = synchronized {
+    if (!processorsStarted.getAndSet(true)) {
+      startProcessors(processors)
+    }
+  }
+
+  private def startProcessors(processors: Seq[Processor]): Unit = synchronized 
{
+    processors.foreach { processor =>
       
KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
         processor).start()
     }
-    processors ++= newProcessors
   }
 
   private[network] def removeProcessors(removeCount: Int, requestChannel: 
RequestChannel): Unit = synchronized {
@@ -328,7 +363,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
   override def shutdown(): Unit = {
     super.shutdown()
-    processors.foreach(_.shutdown())
+    synchronized {
+      processors.foreach(_.shutdown())
+    }
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index a0d2c799e6a..c729c8c90f7 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -243,8 +243,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
         credentialProvider = new 
CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
+        // Create and start the socket server acceptor threads so that the 
bound port is known.
+        // Delay starting processors until the end of the initialization 
sequence to ensure
+        // that credentials have been loaded before processing authentications.
         socketServer = new SocketServer(config, metrics, time, 
credentialProvider)
-        socketServer.startup()
+        socketServer.startup(startupProcessors = false)
 
         /* start replica manager */
         replicaManager = createReplicaManager(isShuttingDown)
@@ -310,7 +313,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         dynamicConfigManager = new DynamicConfigManager(zkClient, 
dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
-
+        socketServer.startProcessors()
         brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
diff --git 
a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala 
b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
new file mode 100644
index 00000000000..18b4f8e23b8
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
@@ -0,0 +1,73 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+import java.util.Collections
+
+import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup}
+import kafka.utils._
+import kafka.zk.ConfigEntityChangeNotificationZNode
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+/**
+ * Tests that there are no failed authentications during broker startup. This 
is to verify
+ * that SCRAM credentials are loaded by brokers before client connections can 
be made.
+ * For simplicity of testing, this test verifies authentications of controller 
connections.
+ */
+class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup {
+
+  override val producerCount = 0
+  override val consumerCount = 0
+  override val serverCount = 1
+
+  private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
+  private val kafkaServerSaslMechanisms = 
Collections.singletonList("SCRAM-SHA-256").asScala
+
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+
+  override protected val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
+  override protected val clientSaslProperties = 
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+
+  override def configureSecurityBeforeServersStart() {
+    super.configureSecurityBeforeServersStart()
+    
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+    // Create credentials before starting brokers
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
+
+    startSasl(jaasSections(kafkaServerSaslMechanisms, 
Option(kafkaClientSaslMechanism), KafkaSasl))
+  }
+
+  @Test
+  def testAuthentications(): Unit = {
+    val successfulAuths = 
totalAuthentications("successful-authentication-total")
+    assertTrue("No successful authentications", successfulAuths > 0)
+    val failedAuths = totalAuthentications("failed-authentication-total")
+    assertEquals(0, failedAuths)
+  }
+
+  private def totalAuthentications(metricName: String): Int = {
+    val allMetrics = servers.head.metrics.metrics
+    val totalAuthCount = 
allMetrics.values().asScala.filter(_.metricName().name() == metricName)
+      .foldLeft(0.0)((total, metric) => total + 
metric.metricValue.asInstanceOf[Double])
+    totalAuthCount.toInt
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Broker should load credentials from ZK before requests are allowed
> ------------------------------------------------------------------
>
>                 Key: KAFKA-6772
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6772
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 1.0.0, 1.1.0, 1.0.1
>            Reporter: Ismael Juma
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.0.2, 1.2.0, 1.1.1
>
>
> It is currently possible for clients to get an AuthenticationException during 
> start-up if the brokers have not yet loaded credentials from ZK. This 
> definitely affects SCRAM, but it may also affect delegation tokens.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to