[GitHub] [kafka] showuon commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


showuon commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1311278157


##
core/src/main/scala/kafka/server/AuthHelper.scala:
##
@@ -130,4 +134,57 @@ class AuthHelper(authorizer: Option[Authorizer]) {
 }
   }
 
+  def computeDescribeClusterResponse(
+request: RequestChannel.Request,
+expectedEndpointType: EndpointType,
+clusterId: String,
+getNodes: () => DescribeClusterBrokerCollection,
+getControllerId: () => Int
+  ): DescribeClusterResponseData = {
+val describeClusterRequest = request.body[DescribeClusterRequest]
+val requestEndpointType = 
EndpointType.fromId(describeClusterRequest.data().endpointType())
+if (requestEndpointType.equals(EndpointType.UNKNOWN)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.UNSUPPORTED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("Unsupported endpoint type " + 
describeClusterRequest.data().endpointType().toInt)
+} else if (!expectedEndpointType.equals(requestEndpointType)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.MISMATCHED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("The request was sent to an endpoint of type " + 
expectedEndpointType +
+  ", but we wanted an endpoint of type " + requestEndpointType)
+}

Review Comment:
   Looks like we won't return `INVALID_REQUEST` response when `request API 
version == 0`, and all other info are valid (i.e. endpoint type is correct and 
matching). Is that expected? I think we should throw `INVALID_REQUEST` error in 
any case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


showuon commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1311278157


##
core/src/main/scala/kafka/server/AuthHelper.scala:
##
@@ -130,4 +134,57 @@ class AuthHelper(authorizer: Option[Authorizer]) {
 }
   }
 
+  def computeDescribeClusterResponse(
+request: RequestChannel.Request,
+expectedEndpointType: EndpointType,
+clusterId: String,
+getNodes: () => DescribeClusterBrokerCollection,
+getControllerId: () => Int
+  ): DescribeClusterResponseData = {
+val describeClusterRequest = request.body[DescribeClusterRequest]
+val requestEndpointType = 
EndpointType.fromId(describeClusterRequest.data().endpointType())
+if (requestEndpointType.equals(EndpointType.UNKNOWN)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.UNSUPPORTED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("Unsupported endpoint type " + 
describeClusterRequest.data().endpointType().toInt)
+} else if (!expectedEndpointType.equals(requestEndpointType)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.MISMATCHED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("The request was sent to an endpoint of type " + 
expectedEndpointType +
+  ", but we wanted an endpoint of type " + requestEndpointType)
+}

Review Comment:
   Looks like we won't return `INVALID_REQUEST` response when `request API 
version == 0`, and all other info are valid (i.e. endpoint type is correct and 
matching). Is that expected? I think we should throw `INVALID_REQUEST` error 
when endpointType has value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-08-31 Thread via GitHub


showuon commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1311398405


##
core/src/test/resources/log4j.properties:
##
@@ -12,14 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=OFF, stdout
+log4j.rootLogger=DEBUG, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=WARN
-log4j.logger.org.apache.kafka=WARN
+log4j.logger.kafka=DEBUG
+log4j.logger.org.apache.kafka=DEBUG

Review Comment:
   Should we revert them?



##
core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala:
##
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.common.message.ControllerRegistrationResponseData
+import org.apache.kafka.common.metadata.{FeatureLevelRecord, 
RegisterControllerRecord}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.ControllerRegistrationResponse
+import org.apache.kafka.common.utils.{ExponentialBackoff, Time}
+import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
+import org.apache.kafka.metadata.{RecordTestUtils, VersionRange}
+import org.apache.kafka.raft.LeaderAndEpoch
+import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.test.TestUtils
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.{Test, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.{OptionalInt, Properties}
+import java.util.concurrent.{CompletableFuture, TimeUnit}
+
+@Timeout(value = 60)
+class ControllerRegistrationManagerTest {
+  private val controller1 = new Node(1, "localhost", 7000)
+
+  private def configProperties = {
+val properties = new Properties()
+properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo")
+properties.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+properties.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"CONTROLLER:PLAINTEXT")
+properties.setProperty(KafkaConfig.ListenersProp, 
s"CONTROLLER://localhost:0")
+properties.setProperty(KafkaConfig.ControllerListenerNamesProp, 
"CONTROLLER")
+properties.setProperty(KafkaConfig.NodeIdProp, "1")
+properties.setProperty(KafkaConfig.QuorumVotersProp, 
s"1@localhost:8000,2@localhost:5000,3@localhost:7000")
+properties
+  }
+
+  private def createSupportedFeatures(
+highestSupportedMetadataVersion: MetadataVersion
+  ): java.util.Map[String, VersionRange] = {
+val results = new util.HashMap[String, VersionRange]()
+results.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
+  MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
+  highestSupportedMetadataVersion.featureLevel()))
+results
+  }
+
+  private def newControllerRegistrationManager(
+context: RegistrationTestContext,
+  ): ControllerRegistrationManager = {
+new ControllerRegistrationManager(context.config,
+  context.clusterId,
+  Time.SYSTEM,
+  "controller-registration-manager-test-",
+  createSupportedFeatures(MetadataVersion.IBP_3_6_IV2),
+  RecordTestUtils.createTestControllerRegistration(1, 
false).incarnationId(),
+  new ExponentialBackoff(1, 2, 100, 0.02))
+  }
+
+  private def registered(manager: ControllerRegistrationManager): Boolean = {
+val registered = new CompletableFuture[Boolean]
+manager.eventQueue.append(() => {
+  registered.complete(manager.registered)
+})
+registered.get(30, TimeUnit.SECONDS)
+  }
+
+  private def rpcStats(manager: ControllerRegistrationManager): (Long, Long, 
Long) = {
+val failedAttempts = new CompletableFuture[(Long, Long, Long)]

Review C

[GitHub] [kafka] showuon commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-01 Thread via GitHub


showuon commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312662157


##
metadata/src/main/java/org/apache/kafka/controller/Controller.java:
##
@@ -390,6 +391,19 @@ CompletableFuture> 
createPartitions(
 boolean validateOnly
 );
 
+/**
+ * Attempt to register the given controller.
+ *
+ * @param context   The controller request context.
+ * @param request  The registration request.
+ *
+ * @return A future yielding the broker registration reply.

Review Comment:
   nit: alignment



##
metadata/src/test/java/org/apache/kafka/image/publisher/ControllerRegistrationsPublisherTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.junit.jupiter.api.Timeout;
+
+
+@Timeout(value = 40)
+public class ControllerRegistrationsPublisherTest {
+/*
+@Test
+public void testInitialControllers() {
+ControllerRegistrationsPublisher publisher = new 
ControllerRegistrationsPublisher();
+assertEquals(Collections.emptyMap(), publisher.controllers());
+}
+

Review Comment:
   All commented out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-01 Thread via GitHub


showuon commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1312733661


##
core/src/main/scala/kafka/server/AuthHelper.scala:
##
@@ -130,4 +134,57 @@ class AuthHelper(authorizer: Option[Authorizer]) {
 }
   }
 
+  def computeDescribeClusterResponse(
+request: RequestChannel.Request,
+expectedEndpointType: EndpointType,
+clusterId: String,
+getNodes: () => DescribeClusterBrokerCollection,
+getControllerId: () => Int
+  ): DescribeClusterResponseData = {
+val describeClusterRequest = request.body[DescribeClusterRequest]
+val requestEndpointType = 
EndpointType.fromId(describeClusterRequest.data().endpointType())
+if (requestEndpointType.equals(EndpointType.UNKNOWN)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.UNSUPPORTED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("Unsupported endpoint type " + 
describeClusterRequest.data().endpointType().toInt)
+} else if (!expectedEndpointType.equals(requestEndpointType)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.MISMATCHED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("The request was sent to an endpoint of type " + 
expectedEndpointType +
+  ", but we wanted an endpoint of type " + requestEndpointType)
+}

Review Comment:
   Yes, but that's under the assumption of users are using our java client. 
What if they use other client that no validation during serialization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-03 Thread via GitHub


showuon commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1314398152


##
core/src/main/scala/kafka/server/AuthHelper.scala:
##
@@ -130,4 +134,57 @@ class AuthHelper(authorizer: Option[Authorizer]) {
 }
   }
 
+  def computeDescribeClusterResponse(
+request: RequestChannel.Request,
+expectedEndpointType: EndpointType,
+clusterId: String,
+getNodes: () => DescribeClusterBrokerCollection,
+getControllerId: () => Int
+  ): DescribeClusterResponseData = {
+val describeClusterRequest = request.body[DescribeClusterRequest]
+val requestEndpointType = 
EndpointType.fromId(describeClusterRequest.data().endpointType())
+if (requestEndpointType.equals(EndpointType.UNKNOWN)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.UNSUPPORTED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("Unsupported endpoint type " + 
describeClusterRequest.data().endpointType().toInt)
+} else if (!expectedEndpointType.equals(requestEndpointType)) {
+  return new DescribeClusterResponseData().
+setErrorCode(if (request.header.data().requestApiVersion() == 0) {
+  Errors.INVALID_REQUEST.code()
+} else {
+  Errors.MISMATCHED_ENDPOINT_TYPE.code()
+}).
+setErrorMessage("The request was sent to an endpoint of type " + 
expectedEndpointType +
+  ", but we wanted an endpoint of type " + requestEndpointType)
+}

Review Comment:
   OK, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-04 Thread via GitHub


showuon commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1314886326


##
clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationRequest.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class ControllerRegistrationRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+private final ControllerRegistrationRequestData data;
+
+public Builder(ControllerRegistrationRequestData data) {
+super(ApiKeys.BROKER_HEARTBEAT);

Review Comment:
   Nice catch, @dengziming !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org