[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]

2022-05-04 Thread GitBox


cmccabe commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r865495566


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -906,12 +906,35 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
 if (this != metaLogListener) {
 log.debug("Ignoring {} raft event from an old 
registration", name);
 } else {
-runnable.run();
+try {
+runnable.run();
+} finally {
+maybeCompleteAuthorizerInitialLoad();
+}
 }
 });
 }
 }
 
+private void maybeCompleteAuthorizerInitialLoad() {
+if (!needToCompleteAuthorizerLoad) return;
+OptionalLong highWatermark = raftClient.highWatermark();
+if (highWatermark.isPresent()) {
+if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) {

Review Comment:
   yeah, I think this is OK for now. if we have issues we can revisit...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]

2022-05-04 Thread GitBox


cmccabe commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r865495994


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig,
 
   private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
-  private var startedProcessingRequests = false
-  private var stoppedProcessingRequests = false
 
-  // Processors are now created by each Acceptor. However to preserve 
compatibility, we need to number the processors
-  // globally, so we keep the nextProcessorId counter in SocketServer
-  def nextProcessorId(): Int = {
-nextProcessorId.getAndIncrement()
-  }
+  /**
+   * A future which is completed once all the authorizer futures are complete.
+   */
+  private val allAuthorizerFuturesComplete = new CompletableFuture[Void]
 
   /**
-   * Starts the socket server and creates all the Acceptors and the 
Processors. The Acceptors
-   * start listening at this stage so that the bound port is known when this 
method completes
-   * even when ephemeral ports are used. Acceptors and Processors are started 
if `startProcessingRequests`
-   * is true. If not, acceptors and processors are only started when 
[[kafka.network.SocketServer#startProcessingRequests()]]
-   * is invoked. Delayed starting of acceptors and processors is used to delay 
processing client
-   * connections until server is fully initialized, e.g. to ensure that all 
credentials have been
-   * loaded before authentications are performed. Incoming connections on this 
server are processed
-   * when processors start up and invoke 
[[org.apache.kafka.common.network.Selector#poll]].
-   *
-   * @param startProcessingRequests Flag indicating whether `Processor`s must 
be started.
-   * @param controlPlaneListenerThe control plane listener, or None if 
there is none.
-   * @param dataPlaneListeners  The data plane listeners.
+   * True if the SocketServer is stopped. Must be accessed under the 
SocketServer lock.
*/
-  def startup(startProcessingRequests: Boolean = true,
-  controlPlaneListener: Option[EndPoint] = 
config.controlPlaneListener,
-  dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): 
Unit = {
-this.synchronized {
-  createControlPlaneAcceptorAndProcessor(controlPlaneListener)
-  createDataPlaneAcceptorsAndProcessors(dataPlaneListeners)
-  if (startProcessingRequests) {
-this.startProcessingRequests()
-  }
-}
+  private var stopped = false
 
+  // Socket server metrics
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {

Review Comment:
   Yeah, it would be nice to avoid the synchronization. Filed KAFKA-13874 for 
this.
   
   Dynamic listeners really make this kind of thing hard, even though very few 
people use them. :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]

2022-05-04 Thread GitBox


cmccabe commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r865496509


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig,
 
   private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
-  private var startedProcessingRequests = false
-  private var stoppedProcessingRequests = false
 
-  // Processors are now created by each Acceptor. However to preserve 
compatibility, we need to number the processors
-  // globally, so we keep the nextProcessorId counter in SocketServer
-  def nextProcessorId(): Int = {
-nextProcessorId.getAndIncrement()
-  }
+  /**
+   * A future which is completed once all the authorizer futures are complete.
+   */
+  private val allAuthorizerFuturesComplete = new CompletableFuture[Void]
 
   /**
-   * Starts the socket server and creates all the Acceptors and the 
Processors. The Acceptors
-   * start listening at this stage so that the bound port is known when this 
method completes
-   * even when ephemeral ports are used. Acceptors and Processors are started 
if `startProcessingRequests`
-   * is true. If not, acceptors and processors are only started when 
[[kafka.network.SocketServer#startProcessingRequests()]]
-   * is invoked. Delayed starting of acceptors and processors is used to delay 
processing client
-   * connections until server is fully initialized, e.g. to ensure that all 
credentials have been
-   * loaded before authentications are performed. Incoming connections on this 
server are processed
-   * when processors start up and invoke 
[[org.apache.kafka.common.network.Selector#poll]].
-   *
-   * @param startProcessingRequests Flag indicating whether `Processor`s must 
be started.
-   * @param controlPlaneListenerThe control plane listener, or None if 
there is none.
-   * @param dataPlaneListeners  The data plane listeners.
+   * True if the SocketServer is stopped. Must be accessed under the 
SocketServer lock.
*/
-  def startup(startProcessingRequests: Boolean = true,
-  controlPlaneListener: Option[EndPoint] = 
config.controlPlaneListener,
-  dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): 
Unit = {
-this.synchronized {
-  createControlPlaneAcceptorAndProcessor(controlPlaneListener)
-  createDataPlaneAcceptorsAndProcessors(dataPlaneListeners)
-  if (startProcessingRequests) {
-this.startProcessingRequests()
-  }
-}
+  private var stopped = false
 
+  // Socket server metrics
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
 val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
-val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => 
a.processors(0))
-
newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () 
=> SocketServer.this.synchronized {
-  val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
-metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-  }
+val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
+  metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
+}
+if (dataPlaneProcessors.isEmpty) {
+  1.0
+} else {
   ioWaitRatioMetricNames.map { metricName =>
 Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
   }.sum / dataPlaneProcessors.size
-})
-
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
-  val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
-metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-  }
-  ioWaitRatioMetricName.map { metricName =>
-Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
-  }.getOrElse(Double.NaN)
-})
-newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
-newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
-
newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () 
=> SocketServer.this.synchronized {
-  val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { 
p =>
-metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
-  }
-  expiredConnectionsKilledCountMetricNames.map { metricName =>
-Option(metrics.metric(metricName)).fold(0.0)(m => 
m.metricValue.asInstanceOf[Double])
-  }.sum
-})
-
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => SocketServer.this.synchronized {
-  val expiredConnectionsKilledCountMetricNames = 
controlPlaneProcessorOpt.map {

[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]

2022-05-04 Thread GitBox


cmccabe commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r865499094


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   sensor
 }
   }
+
+  /**
+   * Close `channel` and decrement the connection count.
+   */
+  def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = 
{
+if (channel != null) {
+  debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
+  dec(listenerName, channel.socket.getInetAddress)
+  closeSocket(channel, this)

Review Comment:
   Well, the ConnectionQuotas class is tracking how many connections exist. It 
needs to be informed when a connection is closed. If you want, I can have the 
function take a Logging parameter so that we can log until `SocketServer` (or 
whatever) rather than `SocketServer.ConnectionQuotas`. But this is a DEBUG 
level message anyway... realistically, you don't ever see this in prod, so... 
not sure if it matters.



##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   sensor
 }
   }
+
+  /**
+   * Close `channel` and decrement the connection count.
+   */
+  def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = 
{
+if (channel != null) {
+  debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
+  dec(listenerName, channel.socket.getInetAddress)
+  closeSocket(channel, this)

Review Comment:
   Well, the ConnectionQuotas class is tracking how many connections exist. It 
needs to be informed when a connection is closed. If you want, I can have the 
function take a Logging parameter so that we can log under `SocketServer` (or 
whatever) rather than `SocketServer.ConnectionQuotas`. But this is a DEBUG 
level message anyway... realistically, you don't ever see this in prod, so... 
not sure if it matters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org