[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-07 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1319165974


##
clients/src/main/resources/common/message/ControllerRegistrationRequest.json:
##
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 70,

Review Comment:
   edit: now using 70 again since someone claimed 69 :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-05 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1316395025


##
clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationRequest.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class ControllerRegistrationRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+private final ControllerRegistrationRequestData data;
+
+public Builder(ControllerRegistrationRequestData data) {
+super(ApiKeys.BROKER_HEARTBEAT);

Review Comment:
   Yes, nice catch. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-05 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1316393751


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -320,6 +323,19 @@ public short registerBrokerRecordVersion() {
 }
 }
 
+public short registerControllerRecordVersion() {
+if (isAtLeast(MetadataVersion.IBP_3_6_IV2)) {

Review Comment:
   Sorry, this was left over from when the feature was in `IBP_3_6_IV2` 
previously. Fixed now.



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -320,6 +323,19 @@ public short registerBrokerRecordVersion() {
 }
 }
 
+public short registerControllerRecordVersion() {
+if (isAtLeast(MetadataVersion.IBP_3_6_IV2)) {

Review Comment:
   Sorry, this was left over from when the feature was in `IBP_3_6_IV2` 
previously. Fixed now to be `IBP_3_7_IV0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-05 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1316393088


##
clients/src/main/resources/common/message/ControllerRegistrationRequest.json:
##
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 70,

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-01 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1313371146


##
metadata/src/test/java/org/apache/kafka/image/publisher/ControllerRegistrationsPublisherTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.junit.jupiter.api.Timeout;
+
+
+@Timeout(value = 40)
+public class ControllerRegistrationsPublisherTest {
+/*
+@Test
+public void testInitialControllers() {
+ControllerRegistrationsPublisher publisher = new 
ControllerRegistrationsPublisher();
+assertEquals(Collections.emptyMap(), publisher.controllers());
+}
+

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-01 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1313363156


##
core/src/main/scala/kafka/server/AuthHelper.scala:
##
@@ -130,4 +134,57 @@ class AuthHelper(authorizer: Option[Authorizer]) {
 }
   }
 
+  def computeDescribeClusterResponse(
+request: RequestChannel.Request,
+expectedEndpointType: EndpointType,
+clusterId: String,
+getNodes: () => DescribeClusterBrokerCollection,
+getControllerId: () => Int
+  ): DescribeClusterResponseData = {
+val describeClusterRequest = request.body[DescribeClusterRequest]
+val requestEndpointType = 
EndpointType.fromId(describeClusterRequest.data().endpointType())
+if (requestEndpointType.equals(EndpointType.UNKNOWN)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.UNSUPPORTED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("Unsupported endpoint type " + 
describeClusterRequest.data().endpointType().toInt)
+} else if (!expectedEndpointType.equals(requestEndpointType)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.MISMATCHED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("The request was sent to an endpoint of type " + 
expectedEndpointType +
+  ", but we wanted an endpoint of type " + requestEndpointType)
+}

Review Comment:
   v0 of the schema has no place to put endpoint type. it simply cannot be 
serialized in the v0 encoding. doesn't matter what client or programming 
language you use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312388001


##
metadata/src/main/java/org/apache/kafka/controller/ClusterSupportDescriber.java:
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Map;
+
+import org.apache.kafka.metadata.VersionRange;
+
+
+public interface ClusterSupportDescriber {

Review Comment:
   `ClusterFeatureSupportDescriber` is fine, I guess. A bit long, but 
descriptive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312381991


##
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##
@@ -234,9 +247,64 @@ private ApiError updateFeature(
 }
 }
 
+private Optional reasonNotSupported(
+String featureName,
+short newVersion
+) {
+int numBrokersChecked = 0;
+int numControllersChecked = 0;
+Optional reason = 
quorumFeatures.reasonNotLocallySupported(featureName, newVersion);
+if (reason.isPresent()) return reason;

Review Comment:
   ehhh... we normally do allow them.
   
   ```
   % git grep -w return | grep -w if | wc -l
   1470```
   
   I think the rule is that you need braces for any `if` statement that spans 
multiple lines. Also if it gets (horizontally) long, you should cut that out.
   
   We should probably teach checkstyle about this at some point, or at least 
write it down



##
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##
@@ -234,9 +247,64 @@ private ApiError updateFeature(
 }
 }
 
+private Optional reasonNotSupported(
+String featureName,
+short newVersion
+) {
+int numBrokersChecked = 0;
+int numControllersChecked = 0;
+Optional reason = 
quorumFeatures.reasonNotLocallySupported(featureName, newVersion);
+if (reason.isPresent()) return reason;

Review Comment:
   ehhh... we normally do allow them.
   
   ```
   % git grep -w return | grep -w if | wc -l
   1470
   ```
   
   I think the rule is that you need braces for any `if` statement that spans 
multiple lines. Also if it gets (horizontally) long, you should cut that out.
   
   We should probably teach checkstyle about this at some point, or at least 
write it down



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312379433


##
core/src/main/scala/kafka/server/ControllerRegistrationManager.scala:
##
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit.MILLISECONDS
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.Uuid
+import 
org.apache.kafka.common.message.ControllerRegistrationRequestData.ListenerCollection
+import org.apache.kafka.common.message.ControllerRegistrationRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{ControllerRegistrationRequest, 
ControllerRegistrationResponse}
+import org.apache.kafka.metadata.VersionRange
+import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.queue.EventQueue.DeadlineFunction
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.MetadataVersion
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The broker lifecycle manager owns the broker state.
+ *
+ * Its inputs are messages passed in from other parts of the broker and from 
the
+ * controller: requests to start up, or shut down, for example. Its output are 
the broker
+ * state and various futures that can be used to wait for broker state 
transitions to
+ * occur.
+ *
+ * The lifecycle manager handles registering the broker with the controller, 
as described
+ * in KIP-631. After registration is complete, it handles sending periodic 
broker
+ * heartbeats and processing the responses.
+ *
+ * This code uses an event queue paradigm. Modifications get translated into 
events, which
+ * are placed on the queue to be processed sequentially. As described in the 
JavaDoc for
+ * each variable, most mutable state can be accessed only from that event 
queue thread.
+ * In some cases we expose a volatile variable which can be read from any 
thread, but only
+ * written from the event queue thread.
+ */
+class ControllerRegistrationManager(
+  val config: KafkaConfig,
+  val clusterId: String,
+  val time: Time,
+  val threadNamePrefix: String,
+  val supportedFeatures: util.Map[String, VersionRange],
+  val incarnationId: Uuid,
+  val resendExponentialBackoff: ExponentialBackoff = new 
ExponentialBackoff(100, 2, 12L, 0.02)
+) extends Logging with MetadataPublisher {
+  override def name(): String = "ControllerRegistrationManager"
+
+  val nodeId: Int = config.nodeId
+
+  private def logPrefix(): String = {
+val builder = new StringBuilder("[ControllerRegistrationManager")
+builder.append(" id=").append(config.nodeId)
+builder.append(" incarnation=").append(incarnationId)
+builder.append("] ")
+builder.toString()
+  }
+
+  val logContext = new LogContext(logPrefix())
+
+  this.logIdent = logContext.logPrefix()
+
+  val listenerCollection = {
+val collection = new ListenerCollection()
+config.controllerListeners.foreach(endPoint => {
+  collection.add(new ControllerRegistrationRequestData.Listener().
+setHost(endPoint.host).
+setName(endPoint.listenerName.value()).
+setPort(endPoint.port).
+setSecurityProtocol(endPoint.securityProtocol.id))
+})
+collection
+  }
+
+  /**
+   * The number of RPCs that we are waiting for. Only read or written from the 
event queue thread.
+   */
+  var pendingRpcs = 0L
+
+  /**
+   * The number of RPCs that we successfully sent.
+   * Only read or written from the event queue thread.
+   */
+  var successfulRpcs = 0L
+
+  /**
+   * The number of RPCs that we failed to send, or got back a failure response 
for. This is
+   * cleared after a success. Only read or written from the event queue thread.
+   */
+  var failedRpcs = 0L
+
+  /**
+   * The current metadata version that is in effect. Only read or written from 
the event queue thread.
+   */
+  private var metadataVersion: MetadataVersion 

[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312376398


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3510,43 +3510,34 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleDescribeCluster(request: RequestChannel.Request): Unit = {
-val describeClusterRequest = request.body[DescribeClusterRequest]
-
-var clusterAuthorizedOperations = Int.MinValue // Default value in the 
schema
-// get cluster authorized operations
-if (describeClusterRequest.data.includeClusterAuthorizedOperations) {
-  if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, 
CLUSTER_NAME))
-clusterAuthorizedOperations = authHelper.authorizedOperations(request, 
Resource.CLUSTER)
-  else
-clusterAuthorizedOperations = 0
-}
-
-val brokers = 
metadataCache.getAliveBrokerNodes(request.context.listenerName)
-val controllerId = {
-  metadataCache.getControllerId.flatMap {
-case ZkCachedControllerId(id) => Some(id)
-case KRaftCachedControllerId(_) => metadataCache.getRandomAliveBrokerId
-  }
-}
-
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-  val data = new DescribeClusterResponseData()
-.setThrottleTimeMs(requestThrottleMs)
-.setClusterId(clusterId)
-
.setControllerId(controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID))
-.setClusterAuthorizedOperations(clusterAuthorizedOperations)
-
-
-  brokers.foreach { broker =>
-data.brokers.add(new 
DescribeClusterResponseData.DescribeClusterBroker()
-  .setBrokerId(broker.id)
-  .setHost(broker.host)
-  .setPort(broker.port)
-  .setRack(broker.rack))
+val response = authHelper.computeDescribeClusterResponse(

Review Comment:
   That case is handled in `AuthHelper.computeDescribeClusterResponse`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312373575


##
core/src/main/scala/kafka/server/ControllerRegistrationManager.scala:
##
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit.MILLISECONDS
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.Uuid
+import 
org.apache.kafka.common.message.ControllerRegistrationRequestData.ListenerCollection
+import org.apache.kafka.common.message.ControllerRegistrationRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{ControllerRegistrationRequest, 
ControllerRegistrationResponse}
+import org.apache.kafka.metadata.VersionRange
+import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.queue.EventQueue.DeadlineFunction
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.MetadataVersion
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The broker lifecycle manager owns the broker state.
+ *
+ * Its inputs are messages passed in from other parts of the broker and from 
the
+ * controller: requests to start up, or shut down, for example. Its output are 
the broker
+ * state and various futures that can be used to wait for broker state 
transitions to
+ * occur.
+ *
+ * The lifecycle manager handles registering the broker with the controller, 
as described
+ * in KIP-631. After registration is complete, it handles sending periodic 
broker
+ * heartbeats and processing the responses.
+ *
+ * This code uses an event queue paradigm. Modifications get translated into 
events, which
+ * are placed on the queue to be processed sequentially. As described in the 
JavaDoc for
+ * each variable, most mutable state can be accessed only from that event 
queue thread.
+ * In some cases we expose a volatile variable which can be read from any 
thread, but only
+ * written from the event queue thread.
+ */
+class ControllerRegistrationManager(
+  val config: KafkaConfig,
+  val clusterId: String,
+  val time: Time,
+  val threadNamePrefix: String,
+  val supportedFeatures: util.Map[String, VersionRange],
+  val incarnationId: Uuid,
+  val resendExponentialBackoff: ExponentialBackoff = new 
ExponentialBackoff(100, 2, 12L, 0.02)
+) extends Logging with MetadataPublisher {
+  override def name(): String = "ControllerRegistrationManager"
+
+  val nodeId: Int = config.nodeId
+
+  private def logPrefix(): String = {
+val builder = new StringBuilder("[ControllerRegistrationManager")
+builder.append(" id=").append(config.nodeId)
+builder.append(" incarnation=").append(incarnationId)
+builder.append("] ")
+builder.toString()
+  }
+
+  val logContext = new LogContext(logPrefix())
+
+  this.logIdent = logContext.logPrefix()
+
+  val listenerCollection = {
+val collection = new ListenerCollection()
+config.controllerListeners.foreach(endPoint => {
+  collection.add(new ControllerRegistrationRequestData.Listener().
+setHost(endPoint.host).
+setName(endPoint.listenerName.value()).
+setPort(endPoint.port).
+setSecurityProtocol(endPoint.securityProtocol.id))
+})
+collection
+  }
+
+  /**
+   * The number of RPCs that we are waiting for. Only read or written from the 
event queue thread.
+   */
+  var pendingRpcs = 0L

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312372539


##
core/src/main/scala/kafka/server/ControllerRegistrationManager.scala:
##
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit.MILLISECONDS
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.Uuid
+import 
org.apache.kafka.common.message.ControllerRegistrationRequestData.ListenerCollection
+import org.apache.kafka.common.message.ControllerRegistrationRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{ControllerRegistrationRequest, 
ControllerRegistrationResponse}
+import org.apache.kafka.metadata.VersionRange
+import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.queue.EventQueue.DeadlineFunction
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.MetadataVersion
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The broker lifecycle manager owns the broker state.

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312372436


##
core/src/main/scala/kafka/server/ControllerRegistrationManager.scala:
##
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit.MILLISECONDS
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.Uuid
+import 
org.apache.kafka.common.message.ControllerRegistrationRequestData.ListenerCollection
+import org.apache.kafka.common.message.ControllerRegistrationRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{ControllerRegistrationRequest, 
ControllerRegistrationResponse}
+import org.apache.kafka.metadata.VersionRange
+import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.queue.EventQueue.DeadlineFunction
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.MetadataVersion
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The broker lifecycle manager owns the broker state.
+ *
+ * Its inputs are messages passed in from other parts of the broker and from 
the
+ * controller: requests to start up, or shut down, for example. Its output are 
the broker
+ * state and various futures that can be used to wait for broker state 
transitions to
+ * occur.
+ *
+ * The lifecycle manager handles registering the broker with the controller, 
as described
+ * in KIP-631. After registration is complete, it handles sending periodic 
broker
+ * heartbeats and processing the responses.
+ *
+ * This code uses an event queue paradigm. Modifications get translated into 
events, which
+ * are placed on the queue to be processed sequentially. As described in the 
JavaDoc for
+ * each variable, most mutable state can be accessed only from that event 
queue thread.
+ * In some cases we expose a volatile variable which can be read from any 
thread, but only
+ * written from the event queue thread.
+ */
+class ControllerRegistrationManager(
+  val config: KafkaConfig,
+  val clusterId: String,
+  val time: Time,
+  val threadNamePrefix: String,
+  val supportedFeatures: util.Map[String, VersionRange],
+  val incarnationId: Uuid,
+  val resendExponentialBackoff: ExponentialBackoff = new 
ExponentialBackoff(100, 2, 12L, 0.02)
+) extends Logging with MetadataPublisher {
+  override def name(): String = "ControllerRegistrationManager"
+
+  val nodeId: Int = config.nodeId
+
+  private def logPrefix(): String = {
+val builder = new StringBuilder("[ControllerRegistrationManager")
+builder.append(" id=").append(config.nodeId)
+builder.append(" incarnation=").append(incarnationId)
+builder.append("] ")
+builder.toString()
+  }
+
+  val logContext = new LogContext(logPrefix())
+
+  this.logIdent = logContext.logPrefix()
+
+  val listenerCollection = {
+val collection = new ListenerCollection()
+config.controllerListeners.foreach(endPoint => {
+  collection.add(new ControllerRegistrationRequestData.Listener().
+setHost(endPoint.host).
+setName(endPoint.listenerName.value()).
+setPort(endPoint.port).
+setSecurityProtocol(endPoint.securityProtocol.id))
+})
+collection
+  }
+
+  /**
+   * The number of RPCs that we are waiting for. Only read or written from the 
event queue thread.
+   */
+  var pendingRpcs = 0L
+
+  /**
+   * The number of RPCs that we successfully sent.
+   * Only read or written from the event queue thread.
+   */
+  var successfulRpcs = 0L
+
+  /**
+   * The number of RPCs that we failed to send, or got back a failure response 
for. This is
+   * cleared after a success. Only read or written from the event queue thread.
+   */
+  var failedRpcs = 0L
+
+  /**
+   * The current metadata version that is in effect. Only read or written from 
the event queue thread.
+   */
+  private var metadataVersion: MetadataVersion 

[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312369046


##
core/src/main/scala/kafka/server/ControllerRegistrationManager.scala:
##
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit.MILLISECONDS
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.Uuid
+import 
org.apache.kafka.common.message.ControllerRegistrationRequestData.ListenerCollection
+import org.apache.kafka.common.message.ControllerRegistrationRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{ControllerRegistrationRequest, 
ControllerRegistrationResponse}
+import org.apache.kafka.metadata.VersionRange
+import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.queue.EventQueue.DeadlineFunction
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.MetadataVersion
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The broker lifecycle manager owns the broker state.

Review Comment:
   ;)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312369046


##
core/src/main/scala/kafka/server/ControllerRegistrationManager.scala:
##
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit.MILLISECONDS
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.Uuid
+import 
org.apache.kafka.common.message.ControllerRegistrationRequestData.ListenerCollection
+import org.apache.kafka.common.message.ControllerRegistrationRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{ControllerRegistrationRequest, 
ControllerRegistrationResponse}
+import org.apache.kafka.metadata.VersionRange
+import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.queue.EventQueue.DeadlineFunction
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.MetadataVersion
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * The broker lifecycle manager owns the broker state.

Review Comment:
   :X



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312367762


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -1005,4 +1023,20 @@ class ControllerApis(val requestChannel: RequestChannel,
 }
   }
   }
+
+  def handleDescribeCluster(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+// Unlike on the broker, DESCRIBE_CLUSTER on the controller requires a 
high level of
+// permissions (ALTER on CLUSTER).

Review Comment:
   direct-to-controller operation is intended only for administrators. if 
you're not an admin, you should talk to the brokers. (In a well-run network, 
this should also be enforced by putting non-administrators on a separate 
subnet.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312363063


##
core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala:
##
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.common.message.ControllerRegistrationResponseData
+import org.apache.kafka.common.metadata.{FeatureLevelRecord, 
RegisterControllerRecord}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.ControllerRegistrationResponse
+import org.apache.kafka.common.utils.{ExponentialBackoff, Time}
+import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
+import org.apache.kafka.metadata.{RecordTestUtils, VersionRange}
+import org.apache.kafka.raft.LeaderAndEpoch
+import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.test.TestUtils
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.{Test, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.{OptionalInt, Properties}
+import java.util.concurrent.{CompletableFuture, TimeUnit}
+
+@Timeout(value = 60)
+class ControllerRegistrationManagerTest {
+  private val controller1 = new Node(1, "localhost", 7000)
+
+  private def configProperties = {
+val properties = new Properties()
+properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo")
+properties.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+properties.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"CONTROLLER:PLAINTEXT")
+properties.setProperty(KafkaConfig.ListenersProp, 
s"CONTROLLER://localhost:0")
+properties.setProperty(KafkaConfig.ControllerListenerNamesProp, 
"CONTROLLER")
+properties.setProperty(KafkaConfig.NodeIdProp, "1")
+properties.setProperty(KafkaConfig.QuorumVotersProp, 
s"1@localhost:8000,2@localhost:5000,3@localhost:7000")
+properties
+  }
+
+  private def createSupportedFeatures(
+highestSupportedMetadataVersion: MetadataVersion
+  ): java.util.Map[String, VersionRange] = {
+val results = new util.HashMap[String, VersionRange]()
+results.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
+  MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
+  highestSupportedMetadataVersion.featureLevel()))
+results
+  }
+
+  private def newControllerRegistrationManager(
+context: RegistrationTestContext,
+  ): ControllerRegistrationManager = {
+new ControllerRegistrationManager(context.config,
+  context.clusterId,
+  Time.SYSTEM,
+  "controller-registration-manager-test-",
+  createSupportedFeatures(MetadataVersion.IBP_3_6_IV2),
+  RecordTestUtils.createTestControllerRegistration(1, 
false).incarnationId(),
+  new ExponentialBackoff(1, 2, 100, 0.02))
+  }
+
+  private def registered(manager: ControllerRegistrationManager): Boolean = {
+val registered = new CompletableFuture[Boolean]
+manager.eventQueue.append(() => {
+  registered.complete(manager.registered)
+})
+registered.get(30, TimeUnit.SECONDS)
+  }
+
+  private def rpcStats(manager: ControllerRegistrationManager): (Long, Long, 
Long) = {
+val failedAttempts = new CompletableFuture[(Long, Long, Long)]

Review Comment:
   number of times we've tried to send an RPC and failed for some reason (or 
gotten back an error code)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312362682


##
core/src/test/resources/log4j.properties:
##
@@ -12,14 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=OFF, stdout
+log4j.rootLogger=DEBUG, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=WARN
-log4j.logger.org.apache.kafka=WARN
+log4j.logger.kafka=DEBUG
+log4j.logger.org.apache.kafka=DEBUG

Review Comment:
   yes, this should be reverted. I'll fix it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312362333


##
core/src/main/scala/kafka/server/AuthHelper.scala:
##
@@ -130,4 +134,57 @@ class AuthHelper(authorizer: Option[Authorizer]) {
 }
   }
 
+  def computeDescribeClusterResponse(
+request: RequestChannel.Request,
+expectedEndpointType: EndpointType,
+clusterId: String,
+getNodes: () => DescribeClusterBrokerCollection,
+getControllerId: () => Int
+  ): DescribeClusterResponseData = {
+val describeClusterRequest = request.body[DescribeClusterRequest]
+val requestEndpointType = 
EndpointType.fromId(describeClusterRequest.data().endpointType())
+if (requestEndpointType.equals(EndpointType.UNKNOWN)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.UNSUPPORTED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("Unsupported endpoint type " + 
describeClusterRequest.data().endpointType().toInt)
+} else if (!expectedEndpointType.equals(requestEndpointType)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.MISMATCHED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("The request was sent to an endpoint of type " + 
expectedEndpointType +
+  ", but we wanted an endpoint of type " + requestEndpointType)
+}

Review Comment:
   It's not possible to have request API version 0 and have an endpoint type 
other than BROKER. That's enforced by the serialization layer, so we don't have 
to do anything here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org