This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e857c5 KAFKA-13057; Add KRaft "broker" to several RPC's listeners
(#11012)
6e857c5 is described below
commit 6e857c531f14d07d5b05f174e6063a124c917324
Author: David Arthur <[email protected]>
AuthorDate: Sat Jul 10 13:45:27 2021 -0400
KAFKA-13057; Add KRaft "broker" to several RPC's listeners (#11012)
This patch fixes a few request listener specs. We were missing "broker" for
many APIs which are now implemented in KRaft and there were a couple cases
where we had unnecessarily exposed a controller-only API on the broker.
Reviewers: Jason Gustafson <[email protected]>
---
.../main/resources/common/message/AddOffsetsToTxnRequest.json | 2 +-
.../resources/common/message/AddPartitionsToTxnRequest.json | 2 +-
.../resources/common/message/AllocateProducerIdsRequest.json | 2 +-
.../main/resources/common/message/AlterConfigsRequest.json | 2 +-
.../resources/common/message/CreatePartitionsRequest.json | 2 +-
.../main/resources/common/message/ElectLeadersRequest.json | 2 +-
clients/src/main/resources/common/message/EndTxnRequest.json | 2 +-
.../main/resources/common/message/TxnOffsetCommitRequest.json | 2 +-
.../resources/common/message/UnregisterBrokerRequest.json | 2 +-
.../main/resources/common/message/UpdateFeaturesRequest.json | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 11 +----------
11 files changed, 11 insertions(+), 20 deletions(-)
diff --git
a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
index 7212a02..ade3fc7 100644
--- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 25,
"type": "request",
- "listeners": ["zkBroker"],
+ "listeners": ["zkBroker", "broker"],
"name": "AddOffsetsToTxnRequest",
// Version 1 is the same as version 0.
//
diff --git
a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
index 99e72a9..4920da1 100644
--- a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 24,
"type": "request",
- "listeners": ["zkBroker"],
+ "listeners": ["zkBroker", "broker"],
"name": "AddPartitionsToTxnRequest",
// Version 1 is the same as version 0.
//
diff --git
a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
index 6f37313..7256c6b 100644
--- a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
+++ b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 67,
"type": "request",
- "listeners": ["zkBroker", "broker", "controller"],
+ "listeners": ["zkBroker", "controller"],
"name": "AllocateProducerIdsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/AlterConfigsRequest.json
b/clients/src/main/resources/common/message/AlterConfigsRequest.json
index fa46656..4c28ee1 100644
--- a/clients/src/main/resources/common/message/AlterConfigsRequest.json
+++ b/clients/src/main/resources/common/message/AlterConfigsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 33,
"type": "request",
- "listeners": ["zkBroker"],
+ "listeners": ["zkBroker", "broker"],
"name": "AlterConfigsRequest",
// Version 1 is the same as version 0.
// Version 2 enables flexible versions.
diff --git
a/clients/src/main/resources/common/message/CreatePartitionsRequest.json
b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
index 8053628..6e24949 100644
--- a/clients/src/main/resources/common/message/CreatePartitionsRequest.json
+++ b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 37,
"type": "request",
- "listeners": ["zkBroker"],
+ "listeners": ["zkBroker", "broker", "controller"],
"name": "CreatePartitionsRequest",
// Version 1 is the same as version 0.
//
diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json
b/clients/src/main/resources/common/message/ElectLeadersRequest.json
index d407f5e..dd9fa21 100644
--- a/clients/src/main/resources/common/message/ElectLeadersRequest.json
+++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 43,
"type": "request",
- "listeners": ["zkBroker"],
+ "listeners": ["zkBroker", "broker", "controller"],
"name": "ElectLeadersRequest",
// Version 1 implements multiple leader election types, as described by
KIP-460.
//
diff --git a/clients/src/main/resources/common/message/EndTxnRequest.json
b/clients/src/main/resources/common/message/EndTxnRequest.json
index 7e7d41d..f16ef76 100644
--- a/clients/src/main/resources/common/message/EndTxnRequest.json
+++ b/clients/src/main/resources/common/message/EndTxnRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 26,
"type": "request",
- "listeners": ["zkBroker"],
+ "listeners": ["zkBroker", "broker"],
"name": "EndTxnRequest",
// Version 1 is the same as version 0.
//
diff --git
a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
index 127ff3d..a832ef7 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 28,
"type": "request",
- "listeners": ["zkBroker"],
+ "listeners": ["zkBroker", "broker"],
"name": "TxnOffsetCommitRequest",
// Version 1 is the same as version 0.
//
diff --git
a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
index 05fd315..4fb8d8d 100644
--- a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
+++ b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 64,
"type": "request",
- "listeners": ["broker", "controller"],
+ "listeners": ["controller"],
"name": "UnregisterBrokerRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git
a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
index 41be8cf..2b31813 100644
--- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
+++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 57,
"type": "request",
- "listeners": ["zkBroker"],
+ "listeners": ["zkBroker", "broker"],
"name": "UpdateFeaturesRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4bb5b81..95d0e7b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -223,10 +223,9 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS =>
handleDescribeProducersRequest(request)
- case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request,
handleUnregisterBrokerRequest)
case ApiKeys.DESCRIBE_TRANSACTIONS =>
handleDescribeTransactionsRequest(request)
case ApiKeys.LIST_TRANSACTIONS =>
handleListTransactionsRequest(request)
- case ApiKeys.ALLOCATE_PRODUCER_IDS =>
maybeForwardToController(request, handleAllocateProducerIdsRequest)
+ case ApiKeys.ALLOCATE_PRODUCER_IDS =>
handleAllocateProducerIdsRequest(request)
case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
case _ => throw new IllegalStateException(s"No handler for request api
key ${request.header.apiKey}")
}
@@ -3369,14 +3368,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleUnregisterBrokerRequest(request: RequestChannel.Request): Unit = {
- // This function will not be called when in KRaft mode, since the
- // UNREGISTER_BROKER API is marked as forwardable and we will always have
a forwarding
- // manager.
- throw new UnsupportedVersionException("The broker unregistration API is
not available when using " +
- "Apache ZooKeeper mode.")
- }
-
def handleDescribeTransactionsRequest(request: RequestChannel.Request): Unit
= {
val describeTransactionsRequest = request.body[DescribeTransactionsRequest]
val response = new DescribeTransactionsResponseData()