This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c7f99ad313 MINOR: Cleanup Server Module (#20180)
3c7f99ad313 is described below

commit 3c7f99ad31397a6a7a4975d058891f236f37d02d
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Mon Sep 8 04:31:09 2025 +0530

    MINOR: Cleanup Server Module (#20180)
    
    As the PR title suggests, this PR is an attempt to perform some cleanups
    in the server module. The changes are mostly around the use of Record
    type for some classes, changes to use enhanced switch, etc.
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../kafka/server/ClientRequestQuotaManager.java    |   2 +-
 .../apache/kafka/network/RequestConvertToJson.java | 712 ++++++++++-----------
 .../apache/kafka/network/SocketServerConfigs.java  |  17 +-
 .../network/metrics/RequestChannelMetrics.java     |   4 +-
 .../kafka/network/metrics/RequestMetrics.java      |  24 +-
 .../apache/kafka/security/authorizer/AclEntry.java |  47 +-
 .../java/org/apache/kafka/server/Assignment.java   |  76 +--
 .../java/org/apache/kafka/server/ReplicaState.java |   2 +-
 .../server/config/ClientQuotaManagerConfig.java    |  26 +-
 .../server/logger/LoggingControllerMBean.java      |   1 -
 .../server/purgatory/DelayedDeleteRecords.java     |   3 +-
 .../purgatory/DeleteRecordsPartitionStatus.java    |   1 -
 .../kafka/server/quota/ClientQuotaManager.java     |  11 +-
 .../apache/kafka/server/quota/ClientSensors.java   |  45 +-
 .../quota/ControllerMutationQuotaManager.java      |   1 -
 .../server/share/session/ShareSessionKey.java      |  43 +-
 .../kafka/network/SocketServerConfigsTest.java     |  13 +-
 .../org/apache/kafka/server/AssignmentTest.java    |   4 +-
 .../BootstrapControllersIntegrationTest.java       |   5 +-
 .../server/quota/ClientQuotasRequestTest.java      |   4 +-
 .../kafka/server/quota/ClientSensorsTest.java      | 103 +++
 .../server/share/session/ShareSessionKeyTest.java  |  44 ++
 22 files changed, 580 insertions(+), 608 deletions(-)

diff --git a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java 
b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
index 4229949a9cc..3f2398b8358 100644
--- a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
+++ b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
@@ -56,7 +56,7 @@ public class ClientRequestQuotaManager extends 
ClientQuotaManager {
             Optional<Plugin<ClientQuotaCallback>> quotaCallbackPlugin
     ) {
         super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, 
quotaCallbackPlugin);
-        this.maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
+        this.maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds());
         this.metrics = metrics;
         this.exemptMetricName = metrics.metricName("exempt-request-time", 
QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization 
percentage");
         exemptSensor = getOrCreateSensor(EXEMPT_SENSOR_NAME, 
DEFAULT_INACTIVE_EXEMPT_SENSOR_EXPIRATION_TIME_SECONDS, sensor -> 
sensor.add(exemptMetricName, new Rate()));
diff --git 
a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java 
b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
index e2a76e5caba..9af49f27d66 100644
--- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -392,375 +392,363 @@ import java.util.Optional;
 public class RequestConvertToJson {
 
     public static JsonNode request(AbstractRequest request) {
-        switch (request.apiKey()) {
-            case ADD_OFFSETS_TO_TXN:
-                return 
AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) 
request).data(), request.version());
-            case ADD_PARTITIONS_TO_TXN:
-                return 
AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) 
request).data(), request.version());
-            case ADD_RAFT_VOTER:
-                return 
AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) 
request).data(), request.version());
-            case ALLOCATE_PRODUCER_IDS:
-                return 
AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) 
request).data(), request.version());
-            case ALTER_CLIENT_QUOTAS:
-                return 
AlterClientQuotasRequestDataJsonConverter.write(((AlterClientQuotasRequest) 
request).data(), request.version());
-            case ALTER_CONFIGS:
-                return 
AlterConfigsRequestDataJsonConverter.write(((AlterConfigsRequest) 
request).data(), request.version());
-            case ALTER_PARTITION_REASSIGNMENTS:
-                return 
AlterPartitionReassignmentsRequestDataJsonConverter.write(((AlterPartitionReassignmentsRequest)
 request).data(), request.version());
-            case ALTER_PARTITION:
-                return 
AlterPartitionRequestDataJsonConverter.write(((AlterPartitionRequest) 
request).data(), request.version());
-            case ALTER_REPLICA_LOG_DIRS:
-                return 
AlterReplicaLogDirsRequestDataJsonConverter.write(((AlterReplicaLogDirsRequest) 
request).data(), request.version());
-            case ALTER_SHARE_GROUP_OFFSETS:
-                return 
AlterShareGroupOffsetsRequestDataJsonConverter.write(((AlterShareGroupOffsetsRequest)
 request).data(), request.version());
-            case ALTER_USER_SCRAM_CREDENTIALS:
-                return 
AlterUserScramCredentialsRequestDataJsonConverter.write(((AlterUserScramCredentialsRequest)
 request).data(), request.version());
-            case API_VERSIONS:
-                return 
ApiVersionsRequestDataJsonConverter.write(((ApiVersionsRequest) 
request).data(), request.version());
-            case ASSIGN_REPLICAS_TO_DIRS:
-                return 
AssignReplicasToDirsRequestDataJsonConverter.write(((AssignReplicasToDirsRequest)
 request).data(), request.version());
-            case BEGIN_QUORUM_EPOCH:
-                return 
BeginQuorumEpochRequestDataJsonConverter.write(((BeginQuorumEpochRequest) 
request).data(), request.version());
-            case BROKER_HEARTBEAT:
-                return 
BrokerHeartbeatRequestDataJsonConverter.write(((BrokerHeartbeatRequest) 
request).data(), request.version());
-            case BROKER_REGISTRATION:
-                return 
BrokerRegistrationRequestDataJsonConverter.write(((BrokerRegistrationRequest) 
request).data(), request.version());
-            case CONSUMER_GROUP_DESCRIBE:
-                return 
ConsumerGroupDescribeRequestDataJsonConverter.write(((ConsumerGroupDescribeRequest)
 request).data(), request.version());
-            case CONSUMER_GROUP_HEARTBEAT:
-                return 
ConsumerGroupHeartbeatRequestDataJsonConverter.write(((ConsumerGroupHeartbeatRequest)
 request).data(), request.version());
-            case CONTROLLER_REGISTRATION:
-                return 
ControllerRegistrationRequestDataJsonConverter.write(((ControllerRegistrationRequest)
 request).data(), request.version());
-            case CREATE_ACLS:
-                return 
CreateAclsRequestDataJsonConverter.write(((CreateAclsRequest) request).data(), 
request.version());
-            case CREATE_DELEGATION_TOKEN:
-                return 
CreateDelegationTokenRequestDataJsonConverter.write(((CreateDelegationTokenRequest)
 request).data(), request.version());
-            case CREATE_PARTITIONS:
-                return 
CreatePartitionsRequestDataJsonConverter.write(((CreatePartitionsRequest) 
request).data(), request.version());
-            case CREATE_TOPICS:
-                return 
CreateTopicsRequestDataJsonConverter.write(((CreateTopicsRequest) 
request).data(), request.version());
-            case DELETE_ACLS:
-                return 
DeleteAclsRequestDataJsonConverter.write(((DeleteAclsRequest) request).data(), 
request.version());
-            case DELETE_GROUPS:
-                return 
DeleteGroupsRequestDataJsonConverter.write(((DeleteGroupsRequest) 
request).data(), request.version());
-            case DELETE_RECORDS:
-                return 
DeleteRecordsRequestDataJsonConverter.write(((DeleteRecordsRequest) 
request).data(), request.version());
-            case DELETE_SHARE_GROUP_OFFSETS:
-                return 
DeleteShareGroupOffsetsRequestDataJsonConverter.write(((DeleteShareGroupOffsetsRequest)
 request).data(), request.version());
-            case DELETE_SHARE_GROUP_STATE:
-                return 
DeleteShareGroupStateRequestDataJsonConverter.write(((DeleteShareGroupStateRequest)
 request).data(), request.version());
-            case DELETE_TOPICS:
-                return 
DeleteTopicsRequestDataJsonConverter.write(((DeleteTopicsRequest) 
request).data(), request.version());
-            case DESCRIBE_ACLS:
-                return 
DescribeAclsRequestDataJsonConverter.write(((DescribeAclsRequest) 
request).data(), request.version());
-            case DESCRIBE_CLIENT_QUOTAS:
-                return 
DescribeClientQuotasRequestDataJsonConverter.write(((DescribeClientQuotasRequest)
 request).data(), request.version());
-            case DESCRIBE_CLUSTER:
-                return 
DescribeClusterRequestDataJsonConverter.write(((DescribeClusterRequest) 
request).data(), request.version());
-            case DESCRIBE_CONFIGS:
-                return 
DescribeConfigsRequestDataJsonConverter.write(((DescribeConfigsRequest) 
request).data(), request.version());
-            case DESCRIBE_DELEGATION_TOKEN:
-                return 
DescribeDelegationTokenRequestDataJsonConverter.write(((DescribeDelegationTokenRequest)
 request).data(), request.version());
-            case DESCRIBE_GROUPS:
-                return 
DescribeGroupsRequestDataJsonConverter.write(((DescribeGroupsRequest) 
request).data(), request.version());
-            case DESCRIBE_LOG_DIRS:
-                return 
DescribeLogDirsRequestDataJsonConverter.write(((DescribeLogDirsRequest) 
request).data(), request.version());
-            case DESCRIBE_PRODUCERS:
-                return 
DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) 
request).data(), request.version());
-            case DESCRIBE_QUORUM:
-                return 
DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) 
request).data(), request.version());
-            case DESCRIBE_SHARE_GROUP_OFFSETS:
-                return 
DescribeShareGroupOffsetsRequestDataJsonConverter.write(((DescribeShareGroupOffsetsRequest)
 request).data(), request.version());
-            case DESCRIBE_TOPIC_PARTITIONS:
-                return 
DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest)
 request).data(), request.version());
-            case DESCRIBE_TRANSACTIONS:
-                return 
DescribeTransactionsRequestDataJsonConverter.write(((DescribeTransactionsRequest)
 request).data(), request.version());
-            case DESCRIBE_USER_SCRAM_CREDENTIALS:
-                return 
DescribeUserScramCredentialsRequestDataJsonConverter.write(((DescribeUserScramCredentialsRequest)
 request).data(), request.version());
-            case ELECT_LEADERS:
-                return 
ElectLeadersRequestDataJsonConverter.write(((ElectLeadersRequest) 
request).data(), request.version());
-            case END_QUORUM_EPOCH:
-                return 
EndQuorumEpochRequestDataJsonConverter.write(((EndQuorumEpochRequest) 
request).data(), request.version());
-            case END_TXN:
-                return EndTxnRequestDataJsonConverter.write(((EndTxnRequest) 
request).data(), request.version());
-            case ENVELOPE:
-                return 
EnvelopeRequestDataJsonConverter.write(((EnvelopeRequest) request).data(), 
request.version());
-            case EXPIRE_DELEGATION_TOKEN:
-                return 
ExpireDelegationTokenRequestDataJsonConverter.write(((ExpireDelegationTokenRequest)
 request).data(), request.version());
-            case FETCH:
-                return FetchRequestDataJsonConverter.write(((FetchRequest) 
request).data(), request.version());
-            case FETCH_SNAPSHOT:
-                return 
FetchSnapshotRequestDataJsonConverter.write(((FetchSnapshotRequest) 
request).data(), request.version());
-            case FIND_COORDINATOR:
-                return 
FindCoordinatorRequestDataJsonConverter.write(((FindCoordinatorRequest) 
request).data(), request.version());
-            case GET_TELEMETRY_SUBSCRIPTIONS:
-                return 
GetTelemetrySubscriptionsRequestDataJsonConverter.write(((GetTelemetrySubscriptionsRequest)
 request).data(), request.version());
-            case HEARTBEAT:
-                return 
HeartbeatRequestDataJsonConverter.write(((HeartbeatRequest) request).data(), 
request.version());
-            case INCREMENTAL_ALTER_CONFIGS:
-                return 
IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest)
 request).data(), request.version());
-            case INITIALIZE_SHARE_GROUP_STATE:
-                return 
InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest)
 request).data(), request.version());
-            case INIT_PRODUCER_ID:
-                return 
InitProducerIdRequestDataJsonConverter.write(((InitProducerIdRequest) 
request).data(), request.version());
-            case JOIN_GROUP:
-                return 
JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), 
request.version());
-            case LEAVE_GROUP:
-                return 
LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), 
request.version());
-            case LIST_CONFIG_RESOURCES:
-                return 
ListConfigResourcesRequestDataJsonConverter.write(((ListConfigResourcesRequest) 
request).data(), request.version());
-            case LIST_GROUPS:
-                return 
ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), 
request.version());
-            case LIST_OFFSETS:
-                return 
ListOffsetsRequestDataJsonConverter.write(((ListOffsetsRequest) 
request).data(), request.version());
-            case LIST_PARTITION_REASSIGNMENTS:
-                return 
ListPartitionReassignmentsRequestDataJsonConverter.write(((ListPartitionReassignmentsRequest)
 request).data(), request.version());
-            case LIST_TRANSACTIONS:
-                return 
ListTransactionsRequestDataJsonConverter.write(((ListTransactionsRequest) 
request).data(), request.version());
-            case METADATA:
-                return 
MetadataRequestDataJsonConverter.write(((MetadataRequest) request).data(), 
request.version());
-            case OFFSET_COMMIT:
-                return 
OffsetCommitRequestDataJsonConverter.write(((OffsetCommitRequest) 
request).data(), request.version());
-            case OFFSET_DELETE:
-                return 
OffsetDeleteRequestDataJsonConverter.write(((OffsetDeleteRequest) 
request).data(), request.version());
-            case OFFSET_FETCH:
-                return 
OffsetFetchRequestDataJsonConverter.write(((OffsetFetchRequest) 
request).data(), request.version());
-            case OFFSET_FOR_LEADER_EPOCH:
-                return 
OffsetForLeaderEpochRequestDataJsonConverter.write(((OffsetsForLeaderEpochRequest)
 request).data(), request.version());
-            case PRODUCE:
-                return ProduceRequestDataJsonConverter.write(((ProduceRequest) 
request).data(), request.version(), false);
-            case PUSH_TELEMETRY:
-                return 
PushTelemetryRequestDataJsonConverter.write(((PushTelemetryRequest) 
request).data(), request.version());
-            case READ_SHARE_GROUP_STATE:
-                return 
ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) 
request).data(), request.version());
-            case READ_SHARE_GROUP_STATE_SUMMARY:
-                return 
ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest)
 request).data(), request.version());
-            case REMOVE_RAFT_VOTER:
-                return 
RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) 
request).data(), request.version());
-            case RENEW_DELEGATION_TOKEN:
-                return 
RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest)
 request).data(), request.version());
-            case SASL_AUTHENTICATE:
-                return 
SaslAuthenticateRequestDataJsonConverter.write(((SaslAuthenticateRequest) 
request).data(), request.version());
-            case SASL_HANDSHAKE:
-                return 
SaslHandshakeRequestDataJsonConverter.write(((SaslHandshakeRequest) 
request).data(), request.version());
-            case SHARE_ACKNOWLEDGE:
-                return 
ShareAcknowledgeRequestDataJsonConverter.write(((ShareAcknowledgeRequest) 
request).data(), request.version());
-            case SHARE_FETCH:
-                return 
ShareFetchRequestDataJsonConverter.write(((ShareFetchRequest) request).data(), 
request.version());
-            case SHARE_GROUP_DESCRIBE:
-                return 
ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) 
request).data(), request.version());
-            case SHARE_GROUP_HEARTBEAT:
-                return 
ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) 
request).data(), request.version());
-            case STREAMS_GROUP_DESCRIBE:
-                return 
StreamsGroupDescribeRequestDataJsonConverter.write(((StreamsGroupDescribeRequest)
 request).data(), request.version());
-            case STREAMS_GROUP_HEARTBEAT:
-                return 
StreamsGroupHeartbeatRequestDataJsonConverter.write(((StreamsGroupHeartbeatRequest)
 request).data(), request.version());
-            case SYNC_GROUP:
-                return 
SyncGroupRequestDataJsonConverter.write(((SyncGroupRequest) request).data(), 
request.version());
-            case TXN_OFFSET_COMMIT:
-                return 
TxnOffsetCommitRequestDataJsonConverter.write(((TxnOffsetCommitRequest) 
request).data(), request.version());
-            case UNREGISTER_BROKER:
-                return 
UnregisterBrokerRequestDataJsonConverter.write(((UnregisterBrokerRequest) 
request).data(), request.version());
-            case UPDATE_FEATURES:
-                return 
UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) 
request).data(), request.version());
-            case UPDATE_RAFT_VOTER:
-                return 
UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) 
request).data(), request.version());
-            case VOTE:
-                return VoteRequestDataJsonConverter.write(((VoteRequest) 
request).data(), request.version());
-            case WRITE_SHARE_GROUP_STATE:
-                return 
WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest)
 request).data(), request.version());
-            case WRITE_TXN_MARKERS:
-                return 
WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) 
request).data(), request.version());
-            default:
+        return switch (request.apiKey()) {
+            case ADD_OFFSETS_TO_TXN ->
+                
AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) 
request).data(), request.version());
+            case ADD_PARTITIONS_TO_TXN ->
+                
AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) 
request).data(), request.version());
+            case ADD_RAFT_VOTER ->
+                
AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) 
request).data(), request.version());
+            case ALLOCATE_PRODUCER_IDS ->
+                
AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) 
request).data(), request.version());
+            case ALTER_CLIENT_QUOTAS ->
+                
AlterClientQuotasRequestDataJsonConverter.write(((AlterClientQuotasRequest) 
request).data(), request.version());
+            case ALTER_CONFIGS ->
+                
AlterConfigsRequestDataJsonConverter.write(((AlterConfigsRequest) 
request).data(), request.version());
+            case ALTER_PARTITION_REASSIGNMENTS ->
+                
AlterPartitionReassignmentsRequestDataJsonConverter.write(((AlterPartitionReassignmentsRequest)
 request).data(), request.version());
+            case ALTER_PARTITION ->
+                
AlterPartitionRequestDataJsonConverter.write(((AlterPartitionRequest) 
request).data(), request.version());
+            case ALTER_REPLICA_LOG_DIRS ->
+                
AlterReplicaLogDirsRequestDataJsonConverter.write(((AlterReplicaLogDirsRequest) 
request).data(), request.version());
+            case ALTER_SHARE_GROUP_OFFSETS ->
+                
AlterShareGroupOffsetsRequestDataJsonConverter.write(((AlterShareGroupOffsetsRequest)
 request).data(), request.version());
+            case ALTER_USER_SCRAM_CREDENTIALS ->
+                
AlterUserScramCredentialsRequestDataJsonConverter.write(((AlterUserScramCredentialsRequest)
 request).data(), request.version());
+            case API_VERSIONS ->
+                
ApiVersionsRequestDataJsonConverter.write(((ApiVersionsRequest) 
request).data(), request.version());
+            case ASSIGN_REPLICAS_TO_DIRS ->
+                
AssignReplicasToDirsRequestDataJsonConverter.write(((AssignReplicasToDirsRequest)
 request).data(), request.version());
+            case BEGIN_QUORUM_EPOCH ->
+                
BeginQuorumEpochRequestDataJsonConverter.write(((BeginQuorumEpochRequest) 
request).data(), request.version());
+            case BROKER_HEARTBEAT ->
+                
BrokerHeartbeatRequestDataJsonConverter.write(((BrokerHeartbeatRequest) 
request).data(), request.version());
+            case BROKER_REGISTRATION ->
+                
BrokerRegistrationRequestDataJsonConverter.write(((BrokerRegistrationRequest) 
request).data(), request.version());
+            case CONSUMER_GROUP_DESCRIBE ->
+                
ConsumerGroupDescribeRequestDataJsonConverter.write(((ConsumerGroupDescribeRequest)
 request).data(), request.version());
+            case CONSUMER_GROUP_HEARTBEAT ->
+                
ConsumerGroupHeartbeatRequestDataJsonConverter.write(((ConsumerGroupHeartbeatRequest)
 request).data(), request.version());
+            case CONTROLLER_REGISTRATION ->
+                
ControllerRegistrationRequestDataJsonConverter.write(((ControllerRegistrationRequest)
 request).data(), request.version());
+            case CREATE_ACLS ->
+                CreateAclsRequestDataJsonConverter.write(((CreateAclsRequest) 
request).data(), request.version());
+            case CREATE_DELEGATION_TOKEN ->
+                
CreateDelegationTokenRequestDataJsonConverter.write(((CreateDelegationTokenRequest)
 request).data(), request.version());
+            case CREATE_PARTITIONS ->
+                
CreatePartitionsRequestDataJsonConverter.write(((CreatePartitionsRequest) 
request).data(), request.version());
+            case CREATE_TOPICS ->
+                
CreateTopicsRequestDataJsonConverter.write(((CreateTopicsRequest) 
request).data(), request.version());
+            case DELETE_ACLS ->
+                DeleteAclsRequestDataJsonConverter.write(((DeleteAclsRequest) 
request).data(), request.version());
+            case DELETE_GROUPS ->
+                
DeleteGroupsRequestDataJsonConverter.write(((DeleteGroupsRequest) 
request).data(), request.version());
+            case DELETE_RECORDS ->
+                
DeleteRecordsRequestDataJsonConverter.write(((DeleteRecordsRequest) 
request).data(), request.version());
+            case DELETE_SHARE_GROUP_OFFSETS ->
+                
DeleteShareGroupOffsetsRequestDataJsonConverter.write(((DeleteShareGroupOffsetsRequest)
 request).data(), request.version());
+            case DELETE_SHARE_GROUP_STATE ->
+                
DeleteShareGroupStateRequestDataJsonConverter.write(((DeleteShareGroupStateRequest)
 request).data(), request.version());
+            case DELETE_TOPICS ->
+                
DeleteTopicsRequestDataJsonConverter.write(((DeleteTopicsRequest) 
request).data(), request.version());
+            case DESCRIBE_ACLS ->
+                
DescribeAclsRequestDataJsonConverter.write(((DescribeAclsRequest) 
request).data(), request.version());
+            case DESCRIBE_CLIENT_QUOTAS ->
+                
DescribeClientQuotasRequestDataJsonConverter.write(((DescribeClientQuotasRequest)
 request).data(), request.version());
+            case DESCRIBE_CLUSTER ->
+                
DescribeClusterRequestDataJsonConverter.write(((DescribeClusterRequest) 
request).data(), request.version());
+            case DESCRIBE_CONFIGS ->
+                
DescribeConfigsRequestDataJsonConverter.write(((DescribeConfigsRequest) 
request).data(), request.version());
+            case DESCRIBE_DELEGATION_TOKEN ->
+                
DescribeDelegationTokenRequestDataJsonConverter.write(((DescribeDelegationTokenRequest)
 request).data(), request.version());
+            case DESCRIBE_GROUPS ->
+                
DescribeGroupsRequestDataJsonConverter.write(((DescribeGroupsRequest) 
request).data(), request.version());
+            case DESCRIBE_LOG_DIRS ->
+                
DescribeLogDirsRequestDataJsonConverter.write(((DescribeLogDirsRequest) 
request).data(), request.version());
+            case DESCRIBE_PRODUCERS ->
+                
DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) 
request).data(), request.version());
+            case DESCRIBE_QUORUM ->
+                
DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) 
request).data(), request.version());
+            case DESCRIBE_SHARE_GROUP_OFFSETS ->
+                
DescribeShareGroupOffsetsRequestDataJsonConverter.write(((DescribeShareGroupOffsetsRequest)
 request).data(), request.version());
+            case DESCRIBE_TOPIC_PARTITIONS ->
+                
DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest)
 request).data(), request.version());
+            case DESCRIBE_TRANSACTIONS ->
+                
DescribeTransactionsRequestDataJsonConverter.write(((DescribeTransactionsRequest)
 request).data(), request.version());
+            case DESCRIBE_USER_SCRAM_CREDENTIALS ->
+                
DescribeUserScramCredentialsRequestDataJsonConverter.write(((DescribeUserScramCredentialsRequest)
 request).data(), request.version());
+            case ELECT_LEADERS ->
+                
ElectLeadersRequestDataJsonConverter.write(((ElectLeadersRequest) 
request).data(), request.version());
+            case END_QUORUM_EPOCH ->
+                
EndQuorumEpochRequestDataJsonConverter.write(((EndQuorumEpochRequest) 
request).data(), request.version());
+            case END_TXN -> 
EndTxnRequestDataJsonConverter.write(((EndTxnRequest) request).data(), 
request.version());
+            case ENVELOPE ->
+                EnvelopeRequestDataJsonConverter.write(((EnvelopeRequest) 
request).data(), request.version());
+            case EXPIRE_DELEGATION_TOKEN ->
+                
ExpireDelegationTokenRequestDataJsonConverter.write(((ExpireDelegationTokenRequest)
 request).data(), request.version());
+            case FETCH -> FetchRequestDataJsonConverter.write(((FetchRequest) 
request).data(), request.version());
+            case FETCH_SNAPSHOT ->
+                
FetchSnapshotRequestDataJsonConverter.write(((FetchSnapshotRequest) 
request).data(), request.version());
+            case FIND_COORDINATOR ->
+                
FindCoordinatorRequestDataJsonConverter.write(((FindCoordinatorRequest) 
request).data(), request.version());
+            case GET_TELEMETRY_SUBSCRIPTIONS ->
+                
GetTelemetrySubscriptionsRequestDataJsonConverter.write(((GetTelemetrySubscriptionsRequest)
 request).data(), request.version());
+            case HEARTBEAT ->
+                HeartbeatRequestDataJsonConverter.write(((HeartbeatRequest) 
request).data(), request.version());
+            case INCREMENTAL_ALTER_CONFIGS ->
+                
IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest)
 request).data(), request.version());
+            case INITIALIZE_SHARE_GROUP_STATE ->
+                
InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest)
 request).data(), request.version());
+            case INIT_PRODUCER_ID ->
+                
InitProducerIdRequestDataJsonConverter.write(((InitProducerIdRequest) 
request).data(), request.version());
+            case JOIN_GROUP ->
+                JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) 
request).data(), request.version());
+            case LEAVE_GROUP ->
+                LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) 
request).data(), request.version());
+            case LIST_CONFIG_RESOURCES ->
+                
ListConfigResourcesRequestDataJsonConverter.write(((ListConfigResourcesRequest) 
request).data(), request.version());
+            case LIST_GROUPS ->
+                ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) 
request).data(), request.version());
+            case LIST_OFFSETS ->
+                
ListOffsetsRequestDataJsonConverter.write(((ListOffsetsRequest) 
request).data(), request.version());
+            case LIST_PARTITION_REASSIGNMENTS ->
+                
ListPartitionReassignmentsRequestDataJsonConverter.write(((ListPartitionReassignmentsRequest)
 request).data(), request.version());
+            case LIST_TRANSACTIONS ->
+                
ListTransactionsRequestDataJsonConverter.write(((ListTransactionsRequest) 
request).data(), request.version());
+            case METADATA ->
+                MetadataRequestDataJsonConverter.write(((MetadataRequest) 
request).data(), request.version());
+            case OFFSET_COMMIT ->
+                
OffsetCommitRequestDataJsonConverter.write(((OffsetCommitRequest) 
request).data(), request.version());
+            case OFFSET_DELETE ->
+                
OffsetDeleteRequestDataJsonConverter.write(((OffsetDeleteRequest) 
request).data(), request.version());
+            case OFFSET_FETCH ->
+                
OffsetFetchRequestDataJsonConverter.write(((OffsetFetchRequest) 
request).data(), request.version());
+            case OFFSET_FOR_LEADER_EPOCH ->
+                
OffsetForLeaderEpochRequestDataJsonConverter.write(((OffsetsForLeaderEpochRequest)
 request).data(), request.version());
+            case PRODUCE ->
+                ProduceRequestDataJsonConverter.write(((ProduceRequest) 
request).data(), request.version(), false);
+            case PUSH_TELEMETRY ->
+                
PushTelemetryRequestDataJsonConverter.write(((PushTelemetryRequest) 
request).data(), request.version());
+            case READ_SHARE_GROUP_STATE ->
+                
ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) 
request).data(), request.version());
+            case READ_SHARE_GROUP_STATE_SUMMARY ->
+                
ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest)
 request).data(), request.version());
+            case REMOVE_RAFT_VOTER ->
+                
RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) 
request).data(), request.version());
+            case RENEW_DELEGATION_TOKEN ->
+                
RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest)
 request).data(), request.version());
+            case SASL_AUTHENTICATE ->
+                
SaslAuthenticateRequestDataJsonConverter.write(((SaslAuthenticateRequest) 
request).data(), request.version());
+            case SASL_HANDSHAKE ->
+                
SaslHandshakeRequestDataJsonConverter.write(((SaslHandshakeRequest) 
request).data(), request.version());
+            case SHARE_ACKNOWLEDGE ->
+                
ShareAcknowledgeRequestDataJsonConverter.write(((ShareAcknowledgeRequest) 
request).data(), request.version());
+            case SHARE_FETCH ->
+                ShareFetchRequestDataJsonConverter.write(((ShareFetchRequest) 
request).data(), request.version());
+            case SHARE_GROUP_DESCRIBE ->
+                
ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) 
request).data(), request.version());
+            case SHARE_GROUP_HEARTBEAT ->
+                
ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) 
request).data(), request.version());
+            case STREAMS_GROUP_DESCRIBE ->
+                
StreamsGroupDescribeRequestDataJsonConverter.write(((StreamsGroupDescribeRequest)
 request).data(), request.version());
+            case STREAMS_GROUP_HEARTBEAT ->
+                
StreamsGroupHeartbeatRequestDataJsonConverter.write(((StreamsGroupHeartbeatRequest)
 request).data(), request.version());
+            case SYNC_GROUP ->
+                SyncGroupRequestDataJsonConverter.write(((SyncGroupRequest) 
request).data(), request.version());
+            case TXN_OFFSET_COMMIT ->
+                
TxnOffsetCommitRequestDataJsonConverter.write(((TxnOffsetCommitRequest) 
request).data(), request.version());
+            case UNREGISTER_BROKER ->
+                
UnregisterBrokerRequestDataJsonConverter.write(((UnregisterBrokerRequest) 
request).data(), request.version());
+            case UPDATE_FEATURES ->
+                
UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) 
request).data(), request.version());
+            case UPDATE_RAFT_VOTER ->
+                
UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) 
request).data(), request.version());
+            case VOTE -> VoteRequestDataJsonConverter.write(((VoteRequest) 
request).data(), request.version());
+            case WRITE_SHARE_GROUP_STATE ->
+                
WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest)
 request).data(), request.version());
+            case WRITE_TXN_MARKERS ->
+                
WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) 
request).data(), request.version());
+            default ->
                 throw new IllegalStateException("ApiKey " + request.apiKey() + 
" is not currently handled in `request`, the " +
                     "code should be updated to do so.");
-        }
+        };
     }
 
     public static JsonNode response(AbstractResponse response, short version) {
-        switch (response.apiKey()) {
-            case ADD_OFFSETS_TO_TXN:
-                return 
AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) 
response).data(), version);
-            case ADD_PARTITIONS_TO_TXN:
-                return 
AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) 
response).data(), version);
-            case ADD_RAFT_VOTER:
-                return 
AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) 
response).data(), version);
-            case ALLOCATE_PRODUCER_IDS:
-                return 
AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse)
 response).data(), version);
-            case ALTER_CLIENT_QUOTAS:
-                return 
AlterClientQuotasResponseDataJsonConverter.write(((AlterClientQuotasResponse) 
response).data(), version);
-            case ALTER_CONFIGS:
-                return 
AlterConfigsResponseDataJsonConverter.write(((AlterConfigsResponse) 
response).data(), version);
-            case ALTER_PARTITION_REASSIGNMENTS:
-                return 
AlterPartitionReassignmentsResponseDataJsonConverter.write(((AlterPartitionReassignmentsResponse)
 response).data(), version);
-            case ALTER_PARTITION:
-                return 
AlterPartitionResponseDataJsonConverter.write(((AlterPartitionResponse) 
response).data(), version);
-            case ALTER_REPLICA_LOG_DIRS:
-                return 
AlterReplicaLogDirsResponseDataJsonConverter.write(((AlterReplicaLogDirsResponse)
 response).data(), version);
-            case ALTER_SHARE_GROUP_OFFSETS:
-                return 
AlterShareGroupOffsetsResponseDataJsonConverter.write(((AlterShareGroupOffsetsResponse)
 response).data(), version);
-            case ALTER_USER_SCRAM_CREDENTIALS:
-                return 
AlterUserScramCredentialsResponseDataJsonConverter.write(((AlterUserScramCredentialsResponse)
 response).data(), version);
-            case API_VERSIONS:
-                return 
ApiVersionsResponseDataJsonConverter.write(((ApiVersionsResponse) 
response).data(), version);
-            case ASSIGN_REPLICAS_TO_DIRS:
-                return 
AssignReplicasToDirsResponseDataJsonConverter.write(((AssignReplicasToDirsResponse)
 response).data(), version);
-            case BEGIN_QUORUM_EPOCH:
-                return 
BeginQuorumEpochResponseDataJsonConverter.write(((BeginQuorumEpochResponse) 
response).data(), version);
-            case BROKER_HEARTBEAT:
-                return 
BrokerHeartbeatResponseDataJsonConverter.write(((BrokerHeartbeatResponse) 
response).data(), version);
-            case BROKER_REGISTRATION:
-                return 
BrokerRegistrationResponseDataJsonConverter.write(((BrokerRegistrationResponse) 
response).data(), version);
-            case CONSUMER_GROUP_DESCRIBE:
-                return 
ConsumerGroupDescribeResponseDataJsonConverter.write(((ConsumerGroupDescribeResponse)
 response).data(), version);
-            case CONSUMER_GROUP_HEARTBEAT:
-                return 
ConsumerGroupHeartbeatResponseDataJsonConverter.write(((ConsumerGroupHeartbeatResponse)
 response).data(), version);
-            case CONTROLLER_REGISTRATION:
-                return 
ControllerRegistrationResponseDataJsonConverter.write(((ControllerRegistrationResponse)
 response).data(), version);
-            case CREATE_ACLS:
-                return 
CreateAclsResponseDataJsonConverter.write(((CreateAclsResponse) 
response).data(), version);
-            case CREATE_DELEGATION_TOKEN:
-                return 
CreateDelegationTokenResponseDataJsonConverter.write(((CreateDelegationTokenResponse)
 response).data(), version);
-            case CREATE_PARTITIONS:
-                return 
CreatePartitionsResponseDataJsonConverter.write(((CreatePartitionsResponse) 
response).data(), version);
-            case CREATE_TOPICS:
-                return 
CreateTopicsResponseDataJsonConverter.write(((CreateTopicsResponse) 
response).data(), version);
-            case DELETE_ACLS:
-                return 
DeleteAclsResponseDataJsonConverter.write(((DeleteAclsResponse) 
response).data(), version);
-            case DELETE_GROUPS:
-                return 
DeleteGroupsResponseDataJsonConverter.write(((DeleteGroupsResponse) 
response).data(), version);
-            case DELETE_RECORDS:
-                return 
DeleteRecordsResponseDataJsonConverter.write(((DeleteRecordsResponse) 
response).data(), version);
-            case DELETE_SHARE_GROUP_OFFSETS:
-                return 
DeleteShareGroupOffsetsResponseDataJsonConverter.write(((DeleteShareGroupOffsetsResponse)
 response).data(), version);
-            case DELETE_SHARE_GROUP_STATE:
-                return 
DeleteShareGroupStateResponseDataJsonConverter.write(((DeleteShareGroupStateResponse)
 response).data(), version);
-            case DELETE_TOPICS:
-                return 
DeleteTopicsResponseDataJsonConverter.write(((DeleteTopicsResponse) 
response).data(), version);
-            case DESCRIBE_ACLS:
-                return 
DescribeAclsResponseDataJsonConverter.write(((DescribeAclsResponse) 
response).data(), version);
-            case DESCRIBE_CLIENT_QUOTAS:
-                return 
DescribeClientQuotasResponseDataJsonConverter.write(((DescribeClientQuotasResponse)
 response).data(), version);
-            case DESCRIBE_CLUSTER:
-                return 
DescribeClusterResponseDataJsonConverter.write(((DescribeClusterResponse) 
response).data(), version);
-            case DESCRIBE_CONFIGS:
-                return 
DescribeConfigsResponseDataJsonConverter.write(((DescribeConfigsResponse) 
response).data(), version);
-            case DESCRIBE_DELEGATION_TOKEN:
-                return 
DescribeDelegationTokenResponseDataJsonConverter.write(((DescribeDelegationTokenResponse)
 response).data(), version);
-            case DESCRIBE_GROUPS:
-                return 
DescribeGroupsResponseDataJsonConverter.write(((DescribeGroupsResponse) 
response).data(), version);
-            case DESCRIBE_LOG_DIRS:
-                return 
DescribeLogDirsResponseDataJsonConverter.write(((DescribeLogDirsResponse) 
response).data(), version);
-            case DESCRIBE_PRODUCERS:
-                return 
DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) 
response).data(), version);
-            case DESCRIBE_QUORUM:
-                return 
DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) 
response).data(), version);
-            case DESCRIBE_SHARE_GROUP_OFFSETS:
-                return 
DescribeShareGroupOffsetsResponseDataJsonConverter.write(((DescribeShareGroupOffsetsResponse)
 response).data(), version);
-            case DESCRIBE_TOPIC_PARTITIONS:
-                return 
DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse)
 response).data(), version);
-            case DESCRIBE_TRANSACTIONS:
-                return 
DescribeTransactionsResponseDataJsonConverter.write(((DescribeTransactionsResponse)
 response).data(), version);
-            case DESCRIBE_USER_SCRAM_CREDENTIALS:
-                return 
DescribeUserScramCredentialsResponseDataJsonConverter.write(((DescribeUserScramCredentialsResponse)
 response).data(), version);
-            case ELECT_LEADERS:
-                return 
ElectLeadersResponseDataJsonConverter.write(((ElectLeadersResponse) 
response).data(), version);
-            case END_QUORUM_EPOCH:
-                return 
EndQuorumEpochResponseDataJsonConverter.write(((EndQuorumEpochResponse) 
response).data(), version);
-            case END_TXN:
-                return EndTxnResponseDataJsonConverter.write(((EndTxnResponse) 
response).data(), version);
-            case ENVELOPE:
-                return 
EnvelopeResponseDataJsonConverter.write(((EnvelopeResponse) response).data(), 
version);
-            case EXPIRE_DELEGATION_TOKEN:
-                return 
ExpireDelegationTokenResponseDataJsonConverter.write(((ExpireDelegationTokenResponse)
 response).data(), version);
-            case FETCH:
-                return FetchResponseDataJsonConverter.write(((FetchResponse) 
response).data(), version, false);
-            case FETCH_SNAPSHOT:
-                return 
FetchSnapshotResponseDataJsonConverter.write(((FetchSnapshotResponse) 
response).data(), version);
-            case FIND_COORDINATOR:
-                return 
FindCoordinatorResponseDataJsonConverter.write(((FindCoordinatorResponse) 
response).data(), version);
-            case GET_TELEMETRY_SUBSCRIPTIONS:
-                return 
GetTelemetrySubscriptionsResponseDataJsonConverter.write(((GetTelemetrySubscriptionsResponse)
 response).data(), version);
-            case HEARTBEAT:
-                return 
HeartbeatResponseDataJsonConverter.write(((HeartbeatResponse) response).data(), 
version);
-            case INCREMENTAL_ALTER_CONFIGS:
-                return 
IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse)
 response).data(), version);
-            case INITIALIZE_SHARE_GROUP_STATE:
-                return 
InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse)
 response).data(), version);
-            case INIT_PRODUCER_ID:
-                return 
InitProducerIdResponseDataJsonConverter.write(((InitProducerIdResponse) 
response).data(), version);
-            case JOIN_GROUP:
-                return 
JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), 
version);
-            case LEAVE_GROUP:
-                return 
LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) 
response).data(), version);
-            case LIST_CONFIG_RESOURCES:
-                return 
ListConfigResourcesResponseDataJsonConverter.write(((ListConfigResourcesResponse)
 response).data(), version);
-            case LIST_GROUPS:
-                return 
ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) 
response).data(), version);
-            case LIST_OFFSETS:
-                return 
ListOffsetsResponseDataJsonConverter.write(((ListOffsetsResponse) 
response).data(), version);
-            case LIST_PARTITION_REASSIGNMENTS:
-                return 
ListPartitionReassignmentsResponseDataJsonConverter.write(((ListPartitionReassignmentsResponse)
 response).data(), version);
-            case LIST_TRANSACTIONS:
-                return 
ListTransactionsResponseDataJsonConverter.write(((ListTransactionsResponse) 
response).data(), version);
-            case METADATA:
-                return 
MetadataResponseDataJsonConverter.write(((MetadataResponse) response).data(), 
version);
-            case OFFSET_COMMIT:
-                return 
OffsetCommitResponseDataJsonConverter.write(((OffsetCommitResponse) 
response).data(), version);
-            case OFFSET_DELETE:
-                return 
OffsetDeleteResponseDataJsonConverter.write(((OffsetDeleteResponse) 
response).data(), version);
-            case OFFSET_FETCH:
-                return 
OffsetFetchResponseDataJsonConverter.write(((OffsetFetchResponse) 
response).data(), version);
-            case OFFSET_FOR_LEADER_EPOCH:
-                return 
OffsetForLeaderEpochResponseDataJsonConverter.write(((OffsetsForLeaderEpochResponse)
 response).data(), version);
-            case PRODUCE:
-                return 
ProduceResponseDataJsonConverter.write(((ProduceResponse) response).data(), 
version);
-            case PUSH_TELEMETRY:
-                return 
PushTelemetryResponseDataJsonConverter.write(((PushTelemetryResponse) 
response).data(), version);
-            case READ_SHARE_GROUP_STATE:
-                return 
ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse)
 response).data(), version);
-            case READ_SHARE_GROUP_STATE_SUMMARY:
-                return 
ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse)
 response).data(), version);
-            case REMOVE_RAFT_VOTER:
-                return 
RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) 
response).data(), version);
-            case RENEW_DELEGATION_TOKEN:
-                return 
RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse)
 response).data(), version);
-            case SASL_AUTHENTICATE:
-                return 
SaslAuthenticateResponseDataJsonConverter.write(((SaslAuthenticateResponse) 
response).data(), version);
-            case SASL_HANDSHAKE:
-                return 
SaslHandshakeResponseDataJsonConverter.write(((SaslHandshakeResponse) 
response).data(), version);
-            case SHARE_ACKNOWLEDGE:
-                return 
ShareAcknowledgeResponseDataJsonConverter.write(((ShareAcknowledgeResponse) 
response).data(), version);
-            case SHARE_FETCH:
-                return 
ShareFetchResponseDataJsonConverter.write(((ShareFetchResponse) 
response).data(), version);
-            case SHARE_GROUP_DESCRIBE:
-                return 
ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) 
response).data(), version);
-            case SHARE_GROUP_HEARTBEAT:
-                return 
ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse)
 response).data(), version);
-            case STREAMS_GROUP_DESCRIBE:
-                return 
StreamsGroupDescribeResponseDataJsonConverter.write(((StreamsGroupDescribeResponse)
 response).data(), version);
-            case STREAMS_GROUP_HEARTBEAT:
-                return 
StreamsGroupHeartbeatResponseDataJsonConverter.write(((StreamsGroupHeartbeatResponse)
 response).data(), version);
-            case SYNC_GROUP:
-                return 
SyncGroupResponseDataJsonConverter.write(((SyncGroupResponse) response).data(), 
version);
-            case TXN_OFFSET_COMMIT:
-                return 
TxnOffsetCommitResponseDataJsonConverter.write(((TxnOffsetCommitResponse) 
response).data(), version);
-            case UNREGISTER_BROKER:
-                return 
UnregisterBrokerResponseDataJsonConverter.write(((UnregisterBrokerResponse) 
response).data(), version);
-            case UPDATE_FEATURES:
-                return 
UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) 
response).data(), version);
-            case UPDATE_RAFT_VOTER:
-                return 
UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) 
response).data(), version);
-            case VOTE:
-                return VoteResponseDataJsonConverter.write(((VoteResponse) 
response).data(), version);
-            case WRITE_SHARE_GROUP_STATE:
-                return 
WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse)
 response).data(), version);
-            case WRITE_TXN_MARKERS:
-                return 
WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) 
response).data(), version);
-            default:
+        return switch (response.apiKey()) {
+            case ADD_OFFSETS_TO_TXN ->
+                
AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) 
response).data(), version);
+            case ADD_PARTITIONS_TO_TXN ->
+                
AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) 
response).data(), version);
+            case ADD_RAFT_VOTER ->
+                
AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) 
response).data(), version);
+            case ALLOCATE_PRODUCER_IDS ->
+                
AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse)
 response).data(), version);
+            case ALTER_CLIENT_QUOTAS ->
+                
AlterClientQuotasResponseDataJsonConverter.write(((AlterClientQuotasResponse) 
response).data(), version);
+            case ALTER_CONFIGS ->
+                
AlterConfigsResponseDataJsonConverter.write(((AlterConfigsResponse) 
response).data(), version);
+            case ALTER_PARTITION_REASSIGNMENTS ->
+                
AlterPartitionReassignmentsResponseDataJsonConverter.write(((AlterPartitionReassignmentsResponse)
 response).data(), version);
+            case ALTER_PARTITION ->
+                
AlterPartitionResponseDataJsonConverter.write(((AlterPartitionResponse) 
response).data(), version);
+            case ALTER_REPLICA_LOG_DIRS ->
+                
AlterReplicaLogDirsResponseDataJsonConverter.write(((AlterReplicaLogDirsResponse)
 response).data(), version);
+            case ALTER_SHARE_GROUP_OFFSETS ->
+                
AlterShareGroupOffsetsResponseDataJsonConverter.write(((AlterShareGroupOffsetsResponse)
 response).data(), version);
+            case ALTER_USER_SCRAM_CREDENTIALS ->
+                
AlterUserScramCredentialsResponseDataJsonConverter.write(((AlterUserScramCredentialsResponse)
 response).data(), version);
+            case API_VERSIONS ->
+                
ApiVersionsResponseDataJsonConverter.write(((ApiVersionsResponse) 
response).data(), version);
+            case ASSIGN_REPLICAS_TO_DIRS ->
+                
AssignReplicasToDirsResponseDataJsonConverter.write(((AssignReplicasToDirsResponse)
 response).data(), version);
+            case BEGIN_QUORUM_EPOCH ->
+                
BeginQuorumEpochResponseDataJsonConverter.write(((BeginQuorumEpochResponse) 
response).data(), version);
+            case BROKER_HEARTBEAT ->
+                
BrokerHeartbeatResponseDataJsonConverter.write(((BrokerHeartbeatResponse) 
response).data(), version);
+            case BROKER_REGISTRATION ->
+                
BrokerRegistrationResponseDataJsonConverter.write(((BrokerRegistrationResponse) 
response).data(), version);
+            case CONSUMER_GROUP_DESCRIBE ->
+                
ConsumerGroupDescribeResponseDataJsonConverter.write(((ConsumerGroupDescribeResponse)
 response).data(), version);
+            case CONSUMER_GROUP_HEARTBEAT ->
+                
ConsumerGroupHeartbeatResponseDataJsonConverter.write(((ConsumerGroupHeartbeatResponse)
 response).data(), version);
+            case CONTROLLER_REGISTRATION ->
+                
ControllerRegistrationResponseDataJsonConverter.write(((ControllerRegistrationResponse)
 response).data(), version);
+            case CREATE_ACLS ->
+                
CreateAclsResponseDataJsonConverter.write(((CreateAclsResponse) 
response).data(), version);
+            case CREATE_DELEGATION_TOKEN ->
+                
CreateDelegationTokenResponseDataJsonConverter.write(((CreateDelegationTokenResponse)
 response).data(), version);
+            case CREATE_PARTITIONS ->
+                
CreatePartitionsResponseDataJsonConverter.write(((CreatePartitionsResponse) 
response).data(), version);
+            case CREATE_TOPICS ->
+                
CreateTopicsResponseDataJsonConverter.write(((CreateTopicsResponse) 
response).data(), version);
+            case DELETE_ACLS ->
+                
DeleteAclsResponseDataJsonConverter.write(((DeleteAclsResponse) 
response).data(), version);
+            case DELETE_GROUPS ->
+                
DeleteGroupsResponseDataJsonConverter.write(((DeleteGroupsResponse) 
response).data(), version);
+            case DELETE_RECORDS ->
+                
DeleteRecordsResponseDataJsonConverter.write(((DeleteRecordsResponse) 
response).data(), version);
+            case DELETE_SHARE_GROUP_OFFSETS ->
+                
DeleteShareGroupOffsetsResponseDataJsonConverter.write(((DeleteShareGroupOffsetsResponse)
 response).data(), version);
+            case DELETE_SHARE_GROUP_STATE ->
+                
DeleteShareGroupStateResponseDataJsonConverter.write(((DeleteShareGroupStateResponse)
 response).data(), version);
+            case DELETE_TOPICS ->
+                
DeleteTopicsResponseDataJsonConverter.write(((DeleteTopicsResponse) 
response).data(), version);
+            case DESCRIBE_ACLS ->
+                
DescribeAclsResponseDataJsonConverter.write(((DescribeAclsResponse) 
response).data(), version);
+            case DESCRIBE_CLIENT_QUOTAS ->
+                
DescribeClientQuotasResponseDataJsonConverter.write(((DescribeClientQuotasResponse)
 response).data(), version);
+            case DESCRIBE_CLUSTER ->
+                
DescribeClusterResponseDataJsonConverter.write(((DescribeClusterResponse) 
response).data(), version);
+            case DESCRIBE_CONFIGS ->
+                
DescribeConfigsResponseDataJsonConverter.write(((DescribeConfigsResponse) 
response).data(), version);
+            case DESCRIBE_DELEGATION_TOKEN ->
+                
DescribeDelegationTokenResponseDataJsonConverter.write(((DescribeDelegationTokenResponse)
 response).data(), version);
+            case DESCRIBE_GROUPS ->
+                
DescribeGroupsResponseDataJsonConverter.write(((DescribeGroupsResponse) 
response).data(), version);
+            case DESCRIBE_LOG_DIRS ->
+                
DescribeLogDirsResponseDataJsonConverter.write(((DescribeLogDirsResponse) 
response).data(), version);
+            case DESCRIBE_PRODUCERS ->
+                
DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) 
response).data(), version);
+            case DESCRIBE_QUORUM ->
+                
DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) 
response).data(), version);
+            case DESCRIBE_SHARE_GROUP_OFFSETS ->
+                
DescribeShareGroupOffsetsResponseDataJsonConverter.write(((DescribeShareGroupOffsetsResponse)
 response).data(), version);
+            case DESCRIBE_TOPIC_PARTITIONS ->
+                
DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse)
 response).data(), version);
+            case DESCRIBE_TRANSACTIONS ->
+                
DescribeTransactionsResponseDataJsonConverter.write(((DescribeTransactionsResponse)
 response).data(), version);
+            case DESCRIBE_USER_SCRAM_CREDENTIALS ->
+                
DescribeUserScramCredentialsResponseDataJsonConverter.write(((DescribeUserScramCredentialsResponse)
 response).data(), version);
+            case ELECT_LEADERS ->
+                
ElectLeadersResponseDataJsonConverter.write(((ElectLeadersResponse) 
response).data(), version);
+            case END_QUORUM_EPOCH ->
+                
EndQuorumEpochResponseDataJsonConverter.write(((EndQuorumEpochResponse) 
response).data(), version);
+            case END_TXN -> 
EndTxnResponseDataJsonConverter.write(((EndTxnResponse) response).data(), 
version);
+            case ENVELOPE -> 
EnvelopeResponseDataJsonConverter.write(((EnvelopeResponse) response).data(), 
version);
+            case EXPIRE_DELEGATION_TOKEN ->
+                
ExpireDelegationTokenResponseDataJsonConverter.write(((ExpireDelegationTokenResponse)
 response).data(), version);
+            case FETCH -> 
FetchResponseDataJsonConverter.write(((FetchResponse) response).data(), 
version, false);
+            case FETCH_SNAPSHOT ->
+                
FetchSnapshotResponseDataJsonConverter.write(((FetchSnapshotResponse) 
response).data(), version);
+            case FIND_COORDINATOR ->
+                
FindCoordinatorResponseDataJsonConverter.write(((FindCoordinatorResponse) 
response).data(), version);
+            case GET_TELEMETRY_SUBSCRIPTIONS ->
+                
GetTelemetrySubscriptionsResponseDataJsonConverter.write(((GetTelemetrySubscriptionsResponse)
 response).data(), version);
+            case HEARTBEAT -> 
HeartbeatResponseDataJsonConverter.write(((HeartbeatResponse) response).data(), 
version);
+            case INCREMENTAL_ALTER_CONFIGS ->
+                
IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse)
 response).data(), version);
+            case INITIALIZE_SHARE_GROUP_STATE ->
+                
InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse)
 response).data(), version);
+            case INIT_PRODUCER_ID ->
+                
InitProducerIdResponseDataJsonConverter.write(((InitProducerIdResponse) 
response).data(), version);
+            case JOIN_GROUP -> 
JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), 
version);
+            case LEAVE_GROUP ->
+                
LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) 
response).data(), version);
+            case LIST_CONFIG_RESOURCES ->
+                
ListConfigResourcesResponseDataJsonConverter.write(((ListConfigResourcesResponse)
 response).data(), version);
+            case LIST_GROUPS ->
+                
ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) 
response).data(), version);
+            case LIST_OFFSETS ->
+                
ListOffsetsResponseDataJsonConverter.write(((ListOffsetsResponse) 
response).data(), version);
+            case LIST_PARTITION_REASSIGNMENTS ->
+                
ListPartitionReassignmentsResponseDataJsonConverter.write(((ListPartitionReassignmentsResponse)
 response).data(), version);
+            case LIST_TRANSACTIONS ->
+                
ListTransactionsResponseDataJsonConverter.write(((ListTransactionsResponse) 
response).data(), version);
+            case METADATA -> 
MetadataResponseDataJsonConverter.write(((MetadataResponse) response).data(), 
version);
+            case OFFSET_COMMIT ->
+                
OffsetCommitResponseDataJsonConverter.write(((OffsetCommitResponse) 
response).data(), version);
+            case OFFSET_DELETE ->
+                
OffsetDeleteResponseDataJsonConverter.write(((OffsetDeleteResponse) 
response).data(), version);
+            case OFFSET_FETCH ->
+                
OffsetFetchResponseDataJsonConverter.write(((OffsetFetchResponse) 
response).data(), version);
+            case OFFSET_FOR_LEADER_EPOCH ->
+                
OffsetForLeaderEpochResponseDataJsonConverter.write(((OffsetsForLeaderEpochResponse)
 response).data(), version);
+            case PRODUCE -> 
ProduceResponseDataJsonConverter.write(((ProduceResponse) response).data(), 
version);
+            case PUSH_TELEMETRY ->
+                
PushTelemetryResponseDataJsonConverter.write(((PushTelemetryResponse) 
response).data(), version);
+            case READ_SHARE_GROUP_STATE ->
+                
ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse)
 response).data(), version);
+            case READ_SHARE_GROUP_STATE_SUMMARY ->
+                
ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse)
 response).data(), version);
+            case REMOVE_RAFT_VOTER ->
+                
RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) 
response).data(), version);
+            case RENEW_DELEGATION_TOKEN ->
+                
RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse)
 response).data(), version);
+            case SASL_AUTHENTICATE ->
+                
SaslAuthenticateResponseDataJsonConverter.write(((SaslAuthenticateResponse) 
response).data(), version);
+            case SASL_HANDSHAKE ->
+                
SaslHandshakeResponseDataJsonConverter.write(((SaslHandshakeResponse) 
response).data(), version);
+            case SHARE_ACKNOWLEDGE ->
+                
ShareAcknowledgeResponseDataJsonConverter.write(((ShareAcknowledgeResponse) 
response).data(), version);
+            case SHARE_FETCH ->
+                
ShareFetchResponseDataJsonConverter.write(((ShareFetchResponse) 
response).data(), version);
+            case SHARE_GROUP_DESCRIBE ->
+                
ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) 
response).data(), version);
+            case SHARE_GROUP_HEARTBEAT ->
+                
ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse)
 response).data(), version);
+            case STREAMS_GROUP_DESCRIBE ->
+                
StreamsGroupDescribeResponseDataJsonConverter.write(((StreamsGroupDescribeResponse)
 response).data(), version);
+            case STREAMS_GROUP_HEARTBEAT ->
+                
StreamsGroupHeartbeatResponseDataJsonConverter.write(((StreamsGroupHeartbeatResponse)
 response).data(), version);
+            case SYNC_GROUP -> 
SyncGroupResponseDataJsonConverter.write(((SyncGroupResponse) response).data(), 
version);
+            case TXN_OFFSET_COMMIT ->
+                
TxnOffsetCommitResponseDataJsonConverter.write(((TxnOffsetCommitResponse) 
response).data(), version);
+            case UNREGISTER_BROKER ->
+                
UnregisterBrokerResponseDataJsonConverter.write(((UnregisterBrokerResponse) 
response).data(), version);
+            case UPDATE_FEATURES ->
+                
UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) 
response).data(), version);
+            case UPDATE_RAFT_VOTER ->
+                
UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) 
response).data(), version);
+            case VOTE -> VoteResponseDataJsonConverter.write(((VoteResponse) 
response).data(), version);
+            case WRITE_SHARE_GROUP_STATE ->
+                
WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse)
 response).data(), version);
+            case WRITE_TXN_MARKERS ->
+                
WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) 
response).data(), version);
+            default ->
                 throw new IllegalStateException("ApiKey " + response.apiKey() 
+ " is not currently handled in `response`, the " +
                     "code should be updated to do so.");
-        }
+        };
     }
 
     public static JsonNode requestHeaderNode(RequestHeader header) {
diff --git 
a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java 
b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java
index 5aa0ddcf944..d9d2988d0ed 100644
--- a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java
+++ b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java
@@ -24,8 +24,6 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -175,15 +173,12 @@ public class SocketServerConfigs {
     private static final Pattern URI_PARSE_REGEXP = Pattern.compile(
         "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");
 
-    public static final Map<ListenerName, SecurityProtocol> 
DEFAULT_NAME_TO_SECURITY_PROTO;
-
-    static {
-        HashMap<ListenerName, SecurityProtocol> nameToSecurityProtocol = new 
HashMap<>();
-        for (SecurityProtocol securityProtocol : SecurityProtocol.values()) {
-            
nameToSecurityProtocol.put(ListenerName.forSecurityProtocol(securityProtocol), 
securityProtocol);
-        }
-        DEFAULT_NAME_TO_SECURITY_PROTO = 
Collections.unmodifiableMap(nameToSecurityProtocol);
-    }
+    public static final Map<ListenerName, SecurityProtocol> 
DEFAULT_NAME_TO_SECURITY_PROTO =
+        Arrays.stream(SecurityProtocol.values())
+            .collect(Collectors.toUnmodifiableMap(
+                ListenerName::forSecurityProtocol,
+                Function.identity()
+            ));
 
     public static List<Endpoint> listenerListToEndPoints(
         List<String> input,
diff --git 
a/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java
 
b/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java
index 316d7f44a71..8d43107e8d8 100644
--- 
a/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java
+++ 
b/server/src/main/java/org/apache/kafka/network/metrics/RequestChannelMetrics.java
@@ -19,8 +19,8 @@ package org.apache.kafka.network.metrics;
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.protocol.ApiKeys;
 
-import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -34,7 +34,7 @@ public class RequestChannelMetrics {
         for (ApiKeys apiKey : enabledApis) {
             metricsMap.put(apiKey.name, new RequestMetrics(apiKey.name));
         }
-        for (String name : Arrays.asList(
+        for (String name : List.of(
             RequestMetrics.CONSUMER_FETCH_METRIC_NAME,
             RequestMetrics.FOLLOW_FETCH_METRIC_NAME,
             RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME,
diff --git 
a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java 
b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
index f7cba040c26..2960e9000b7 100644
--- a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
+++ b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
@@ -27,7 +27,6 @@ import com.yammer.metrics.core.Meter;
 import java.util.EnumMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -208,27 +207,6 @@ public class RequestMetrics {
         }
     }
 
-    private static class DeprecatedRequestRateKey {
-
-        private final short version;
-        private final ClientInformation clientInformation;
-
-        private DeprecatedRequestRateKey(short version, ClientInformation 
clientInformation) {
-            this.version = version;
-            this.clientInformation = clientInformation;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            DeprecatedRequestRateKey that = (DeprecatedRequestRateKey) o;
-            return version == that.version && 
Objects.equals(clientInformation, that.clientInformation);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(version, clientInformation);
-        }
+    private record DeprecatedRequestRateKey(short version, ClientInformation 
clientInformation) {
     }
 }
diff --git 
a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java 
b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
index c5232e4fdd3..9abb57a1b4f 100644
--- a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
+++ b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
@@ -51,38 +51,25 @@ public class AclEntry {
         .collect(Collectors.toSet());
 
     public static Set<AclOperation> supportedOperations(ResourceType 
resourceType) {
-        switch (resourceType) {
-            case TOPIC:
-                return Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, 
DESCRIBE_CONFIGS, ALTER_CONFIGS);
-            case GROUP:
-                return Set.of(READ, DESCRIBE, DELETE, DESCRIBE_CONFIGS, 
ALTER_CONFIGS);
-            case CLUSTER:
-                return Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, 
ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE);
-            case TRANSACTIONAL_ID:
-                return Set.of(DESCRIBE, WRITE, TWO_PHASE_COMMIT);
-            case DELEGATION_TOKEN:
-                return Set.of(DESCRIBE);
-            case USER:
-                return Set.of(CREATE_TOKENS, DESCRIBE_TOKENS);
-            default:
-                throw new IllegalArgumentException("Not a concrete resource 
type");
-        }
+        return switch (resourceType) {
+            case TOPIC -> Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, 
DESCRIBE_CONFIGS, ALTER_CONFIGS);
+            case GROUP -> Set.of(READ, DESCRIBE, DELETE, DESCRIBE_CONFIGS, 
ALTER_CONFIGS);
+            case CLUSTER -> Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, 
ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE);
+            case TRANSACTIONAL_ID -> Set.of(DESCRIBE, WRITE, TWO_PHASE_COMMIT);
+            case DELEGATION_TOKEN -> Set.of(DESCRIBE);
+            case USER -> Set.of(CREATE_TOKENS, DESCRIBE_TOKENS);
+            default -> throw new IllegalArgumentException("Not a concrete 
resource type");
+        };
     }
 
     public static Errors authorizationError(ResourceType resourceType) {
-        switch (resourceType) {
-            case TOPIC:
-                return Errors.TOPIC_AUTHORIZATION_FAILED;
-            case GROUP:
-                return Errors.GROUP_AUTHORIZATION_FAILED;
-            case CLUSTER:
-                return Errors.CLUSTER_AUTHORIZATION_FAILED;
-            case TRANSACTIONAL_ID:
-                return Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
-            case DELEGATION_TOKEN:
-                return Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED;
-            default:
-                throw new IllegalArgumentException("Authorization error type 
not known");
-        }
+        return switch (resourceType) {
+            case TOPIC -> Errors.TOPIC_AUTHORIZATION_FAILED;
+            case GROUP -> Errors.GROUP_AUTHORIZATION_FAILED;
+            case CLUSTER -> Errors.CLUSTER_AUTHORIZATION_FAILED;
+            case TRANSACTIONAL_ID -> 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
+            case DELEGATION_TOKEN -> 
Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED;
+            default -> throw new IllegalArgumentException("Authorization error 
type not known");
+        };
     }
 }
diff --git a/server/src/main/java/org/apache/kafka/server/Assignment.java 
b/server/src/main/java/org/apache/kafka/server/Assignment.java
index df517c05568..0b994aa67e6 100644
--- a/server/src/main/java/org/apache/kafka/server/Assignment.java
+++ b/server/src/main/java/org/apache/kafka/server/Assignment.java
@@ -26,63 +26,21 @@ import org.apache.kafka.server.common.TopicIdPartition;
 
 /**
  * The class is not converted to a Java record since record classes are meant 
for pure data, but this one contains a Runnable
- **/
-final class Assignment {
-    /**
-     * The topic ID and partition index of the replica.
-     */
-    private final TopicIdPartition topicIdPartition;
-
-    /**
-     * The ID of the directory we are placing the replica into.
-     */
-    private final Uuid directoryId;
-
-    /**
-     * The time in monotonic nanosecond when this assignment was created.
-     */
-    private final long submissionTimeNs;
-
-    /**
-     * The callback to invoke on success.
-     */
-    private final Runnable successCallback;
-
-    Assignment(
-        TopicIdPartition topicIdPartition,
-        Uuid directoryId,
-        long submissionTimeNs,
-        Runnable successCallback
-    ) {
-        this.topicIdPartition = topicIdPartition;
-        this.directoryId = directoryId;
-        this.submissionTimeNs = submissionTimeNs;
-        this.successCallback = successCallback;
-    }
-
-    TopicIdPartition topicIdPartition() {
-        return topicIdPartition;
-    }
-
-    Uuid directoryId() {
-        return directoryId;
-    }
-
-    long submissionTimeNs() {
-        return submissionTimeNs;
-    }
-
-    Runnable successCallback() {
-        return successCallback;
-    }
+ *
+ * @param topicIdPartition The topic ID and partition index of the replica.
+ * @param directoryId      The ID of the directory we are placing the replica 
into.
+ * @param submissionTimeNs The time in monotonic nanosecond when this 
assignment was created.
+ * @param successCallback  The callback to invoke on success.
+ */
+record Assignment(TopicIdPartition topicIdPartition, Uuid directoryId, long 
submissionTimeNs,
+                  Runnable successCallback) {
 
     /**
      * Check if this Assignment is still valid to be sent.
      *
-     * @param nodeId    The broker ID.
-     * @param image     The metadata image.
-     *
-     * @return          True only if the Assignment is still valid.
+     * @param nodeId The broker ID.
+     * @param image  The metadata image.
+     * @return True only if the Assignment is still valid.
      */
     boolean valid(int nodeId, MetadataImage image) {
         TopicImage topicImage = 
image.topics().getTopic(topicIdPartition.topicId());
@@ -96,16 +54,4 @@ final class Assignment {
         // Check if this broker is still a replica.
         return Replicas.contains(partition.replicas, nodeId);
     }
-
-    @Override
-    public String toString() {
-        StringBuilder bld = new StringBuilder();
-        bld.append("Assignment");
-        bld.append("(topicIdPartition=").append(topicIdPartition);
-        bld.append(", directoryId=").append(directoryId);
-        bld.append(", submissionTimeNs=").append(submissionTimeNs);
-        bld.append(", successCallback=").append(successCallback);
-        bld.append(")");
-        return bld.toString();
-    }
 }
diff --git a/server/src/main/java/org/apache/kafka/server/ReplicaState.java 
b/server/src/main/java/org/apache/kafka/server/ReplicaState.java
index 5bff94f616d..edf7b14dc40 100644
--- a/server/src/main/java/org/apache/kafka/server/ReplicaState.java
+++ b/server/src/main/java/org/apache/kafka/server/ReplicaState.java
@@ -34,5 +34,5 @@ public enum ReplicaState {
         public String toString() {
             return "Fetching";
         }
-    };
+    }
 }
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java
 
b/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java
index d9ed65c3121..5f36358b89f 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java
@@ -15,25 +15,27 @@
  * limitations under the License.
  */
 package org.apache.kafka.server.config;
-public class ClientQuotaManagerConfig {
-    public final int numQuotaSamples;
-    public final int quotaWindowSizeSeconds;
 
+/**
+ * Configuration settings for quota management
+ *
+ * @param numQuotaSamples         The number of samples to retain in memory
+ * @param quotaWindowSizeSeconds  The time span of each sample
+ */
+public record ClientQuotaManagerConfig(
+    int numQuotaSamples,
+    int quotaWindowSizeSeconds
+) {
     /**
-     * Configuration settings for quota management
-     *
-     * @param numQuotaSamples         The number of samples to retain in memory
-     * @param quotaWindowSizeSeconds  The time span of each sample
+     * Default constructor with default values
      */
-    public ClientQuotaManagerConfig(int numQuotaSamples, int 
quotaWindowSizeSeconds) {
-        this.numQuotaSamples = numQuotaSamples;
-        this.quotaWindowSizeSeconds = quotaWindowSizeSeconds;
-    }
-
     public ClientQuotaManagerConfig() {
         this(QuotaConfig.NUM_QUOTA_SAMPLES_DEFAULT, 
QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT);
     }
 
+    /**
+     * Constructor with custom numQuotaSamples and default 
quotaWindowSizeSeconds
+     */
     public ClientQuotaManagerConfig(int numQuotaSamples) {
         this(numQuotaSamples, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT);
     }
diff --git 
a/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
 
b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
index c5e47f2aca4..5dfd47be202 100644
--- 
a/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
+++ 
b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
@@ -18,7 +18,6 @@ package org.apache.kafka.server.logger;
 
 import java.util.List;
 
-
 public interface LoggingControllerMBean {
     List<String> getLoggers();
     String getLogLevel(String logger);
diff --git 
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java
 
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java
index d894a5c6f86..08e85ec68b7 100644
--- 
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java
+++ 
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedDeleteRecords.java
@@ -26,7 +26,6 @@ import com.yammer.metrics.core.Meter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
@@ -57,7 +56,7 @@ public class DelayedDeleteRecords extends DelayedOperation {
                                 Consumer<Map<TopicPartition, 
DeleteRecordsPartitionResult>> responseCallback) {
         super(delayMs);
         this.onAcksPending = onAcksPending;
-        this.deleteRecordsStatus = 
Collections.unmodifiableMap(deleteRecordsStatus);
+        this.deleteRecordsStatus = Map.copyOf(deleteRecordsStatus);
         this.responseCallback = responseCallback;
         // first update the acks pending variable according to the error code
         deleteRecordsStatus.forEach((topicPartition, status) -> {
diff --git 
a/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java
 
b/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java
index e0b5e607404..f26cce7fcd4 100644
--- 
a/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java
+++ 
b/server/src/main/java/org/apache/kafka/server/purgatory/DeleteRecordsPartitionStatus.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.server.purgatory;
 
-
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult;
 import org.apache.kafka.common.protocol.Errors;
 
diff --git 
a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java 
b/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java
index 7c7f74874d3..c91f6bd299a 100644
--- a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java
+++ b/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java
@@ -379,7 +379,7 @@ public class ClientQuotaManager {
         if (!quotasEnabled()) return Double.MAX_VALUE;
         var clientSensors = getOrCreateQuotaSensors(session, clientId);
         var limit = quotaCallback.quotaLimit(clientQuotaType, 
clientSensors.metricTags());
-        if (limit != null) return limit * (config.numQuotaSamples - 1) * 
config.quotaWindowSizeSeconds;
+        if (limit != null) return limit * (config.numQuotaSamples() - 1) * 
config.quotaWindowSizeSeconds();
         return Double.MAX_VALUE;
     }
 
@@ -495,8 +495,8 @@ public class ClientQuotaManager {
 
     private MetricConfig getQuotaMetricConfig(double quotaLimit) {
         return new MetricConfig()
-                .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
-                .samples(config.numQuotaSamples)
+                .timeWindow(config.quotaWindowSizeSeconds(), TimeUnit.SECONDS)
+                .samples(config.numQuotaSamples())
                 .quota(new Quota(quotaLimit, true));
     }
 
@@ -575,9 +575,8 @@ public class ClientQuotaManager {
             return;
         }
 
-        boolean isActive = (quotaCallback instanceof DefaultQuotaCallback 
defaultCallback)
-                ? 
defaultCallback.getActiveQuotasEntities().contains(quotaEntity)
-                : true;
+        boolean isActive = !(quotaCallback instanceof DefaultQuotaCallback 
defaultCallback) ||
+            defaultCallback.getActiveQuotasEntities().contains(quotaEntity);
 
         int activeQuotaType;
         if (quotaEntity.userEntity() != null && quotaEntity.clientIdEntity() 
!= null) {
diff --git 
a/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java 
b/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java
index 63ffc133052..9676e049e53 100644
--- a/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java
+++ b/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java
@@ -24,43 +24,14 @@ import java.util.Objects;
 
 /**
  * Represents the sensors aggregated per client
+ * @param metricTags         quota metric tags for the client
+ * @param quotaSensor        sensor that tracks the quota
+ * @param throttleTimeSensor sensor that tracks the throttle time
  */
-public final class ClientSensors {
-    private final Map<String, String> metricTags;
-    private final Sensor quotaSensor;
-    private final Sensor throttleTimeSensor;
-
-    /**
-     * @param metricTags quota metric tags for the client
-     * @param quotaSensor sensor that tracks the quota
-     * @param throttleTimeSensor sensor that tracks the throttle time
-     */
-    public ClientSensors(Map<String, String> metricTags,
-                         Sensor quotaSensor,
-                         Sensor throttleTimeSensor) {
-        this.metricTags = new LinkedHashMap<>(metricTags);
-        this.quotaSensor = Objects.requireNonNull(quotaSensor);
-        this.throttleTimeSensor = Objects.requireNonNull(throttleTimeSensor);
-    }
-
-    public Map<String, String> metricTags() {
-        return metricTags;
-    }
-
-    public Sensor quotaSensor() {
-        return quotaSensor;
-    }
-
-    public Sensor throttleTimeSensor() {
-        return throttleTimeSensor;
-    }
-
-    @Override
-    public String toString() {
-        return "ClientSensors{" +
-                "metricTags=" + metricTags +
-                ", quotaSensor=" + quotaSensor +
-                ", throttleTimeSensor=" + throttleTimeSensor +
-                '}';
+public record ClientSensors(Map<String, String> metricTags, Sensor 
quotaSensor, Sensor throttleTimeSensor) {
+    public ClientSensors {
+        metricTags = new LinkedHashMap<>(metricTags);
+        Objects.requireNonNull(quotaSensor);
+        Objects.requireNonNull(throttleTimeSensor);
     }
 }
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java
 
b/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java
index ec028e8e289..b7c8665480a 100644
--- 
a/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java
+++ 
b/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.Optional;
 
-
 /**
  * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used 
in the context
  * of throttling controller's operations/mutations.
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
 
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
index 8064bf92899..e3893314c50 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
@@ -21,44 +21,9 @@ import org.apache.kafka.common.Uuid;
 
 import java.util.Objects;
 
-public class ShareSessionKey {
-    private final String groupId;
-    private final Uuid memberId;
-
-    public ShareSessionKey(String groupId, Uuid memberId) {
-        this.groupId = Objects.requireNonNull(groupId);
-        this.memberId = Objects.requireNonNull(memberId);
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public Uuid memberId() {
-        return memberId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(groupId, memberId);
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (this == obj)
-            return true;
-        else if (obj == null || getClass() != obj.getClass())
-            return false;
-        else {
-            ShareSessionKey that = (ShareSessionKey) obj;
-            return groupId.equals(that.groupId) && Objects.equals(memberId, 
that.memberId);
-        }
-    }
-
-    public String toString() {
-        return "ShareSessionKey(" +
-                " groupId=" + groupId +
-                ", memberId=" + memberId +
-                ")";
+public record ShareSessionKey(String groupId, Uuid memberId) {
+    public ShareSessionKey {
+        Objects.requireNonNull(groupId);
+        Objects.requireNonNull(memberId);
     }
 }
diff --git 
a/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java 
b/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java
index 6b0ca02e7cc..13b9d9c2fa8 100644
--- a/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java
+++ b/server/src/test/java/org/apache/kafka/network/SocketServerConfigsTest.java
@@ -32,13 +32,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 public class SocketServerConfigsTest {
     @Test
     public void testDefaultNameToSecurityProto() {
-        Map<ListenerName, SecurityProtocol> expected = Map.of(
-                new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT,
-                new ListenerName("SSL"), SecurityProtocol.SSL,
-                new ListenerName("SASL_PLAINTEXT"), 
SecurityProtocol.SASL_PLAINTEXT,
-                new ListenerName("SASL_SSL"), SecurityProtocol.SASL_SSL
-        );
-        assertEquals(expected, 
SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO);
+        assertEquals(Map.of(
+            new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT,
+            new ListenerName("SSL"), SecurityProtocol.SSL,
+            new ListenerName("SASL_PLAINTEXT"), 
SecurityProtocol.SASL_PLAINTEXT,
+            new ListenerName("SASL_SSL"), SecurityProtocol.SASL_SSL
+        ), SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO);
     }
 
     @Test
diff --git a/server/src/test/java/org/apache/kafka/server/AssignmentTest.java 
b/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
index c538c7d3dd9..04009757d22 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentTest.java
@@ -116,10 +116,10 @@ public class AssignmentTest {
 
     @Test
     public void testAssignmentToString() {
-        assertEquals("Assignment(topicIdPartition=rTudty6ITOCcO_ldVyzZYg:1, " +
+        assertEquals("Assignment[topicIdPartition=rTudty6ITOCcO_ldVyzZYg:1, " +
             "directoryId=rzRT8XZaSbKsP6j238zogg, " +
             "submissionTimeNs=123, " +
-            "successCallback=NoOpRunnable)",
+            "successCallback=NoOpRunnable]",
             new Assignment(new TopicIdPartition(TOPIC_ID, 1),
                 DIRECTORY_ID,
                 123,
diff --git 
a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
 
b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
index 03053a5dfb0..07b7848e02d 100644
--- 
a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
@@ -70,7 +70,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
 import static 
org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
@@ -300,8 +299,8 @@ public class BootstrapControllersIntegrationTest {
 
     private static List<List<Integer>> 
translatePartitionInfoToNodeIdList(List<TopicPartitionInfo> partitions) {
         return partitions.stream()
-                .map(partition -> 
partition.replicas().stream().map(Node::id).collect(Collectors.toList()))
-                .collect(Collectors.toList());
+                .map(partition -> 
partition.replicas().stream().map(Node::id).toList())
+                .toList();
     }
 
     @ClusterTest(serverProperties = {
diff --git 
a/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
 
b/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
index 5f92d700f71..f6548446f56 100644
--- 
a/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
@@ -712,9 +712,9 @@ public class ClientQuotasRequestTest {
                 .stream()
                 .map(configEntry -> new 
ClientQuotaAlteration.Op(configEntry.getKey(),
                     configEntry.getValue().orElse(null)))
-                .collect(Collectors.toList());
+                .toList();
             return new ClientQuotaAlteration(entity, ops);
-        }).collect(Collectors.toList());
+        }).toList();
 
         try (Admin admin = cluster.admin()) {
             Map<ClientQuotaEntity, KafkaFuture<Void>> result = 
admin.alterClientQuotas(entries,
diff --git 
a/server/src/test/java/org/apache/kafka/server/quota/ClientSensorsTest.java 
b/server/src/test/java/org/apache/kafka/server/quota/ClientSensorsTest.java
new file mode 100644
index 00000000000..6979660ac3a
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/quota/ClientSensorsTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class ClientSensorsTest {
+
+    @Test
+    void testConstructorWithValidParameters() {
+        Map<String, String> metricTags = Map.of("client-id", "test-client", 
"user", "test-user");
+        Sensor quotaSensor = mock(Sensor.class);
+        Sensor throttleTimeSensor = mock(Sensor.class);
+
+        ClientSensors clientSensors = new ClientSensors(metricTags, 
quotaSensor, throttleTimeSensor);
+
+        assertEquals(metricTags, clientSensors.metricTags());
+        assertEquals(quotaSensor, clientSensors.quotaSensor());
+        assertEquals(throttleTimeSensor, clientSensors.throttleTimeSensor());
+    }
+
+    @Test
+    void testConstructorPreservesInputOrder() {
+        LinkedHashMap<String, String> orderedTags = new LinkedHashMap<>();
+        orderedTags.put("first", "value1");
+        orderedTags.put("second", "value2");
+        orderedTags.put("third", "value3");
+        orderedTags.put("fourth", "value4");
+        Sensor quotaSensor = mock(Sensor.class);
+        Sensor throttleTimeSensor = mock(Sensor.class);
+
+        ClientSensors clientSensors = new ClientSensors(orderedTags, 
quotaSensor, throttleTimeSensor);
+
+        Map<String, String> resultTags = clientSensors.metricTags();
+        assertInstanceOf(LinkedHashMap.class, resultTags);
+        
+        // Convert to arrays to check order
+        String[] expectedKeys = {"first", "second", "third", "fourth"};
+        String[] actualKeys = resultTags.keySet().toArray(new String[0]);
+        
+        for (int i = 0; i < expectedKeys.length; i++) {
+            assertEquals(expectedKeys[i], actualKeys[i], 
+                "Key at position " + i + " should match expected order");
+        }
+    }
+
+    @Test
+    void testConstructorWithEmptyMap() {
+        Map<String, String> emptyTags = Map.of();
+        Sensor quotaSensor = mock(Sensor.class);
+        Sensor throttleTimeSensor = mock(Sensor.class);
+
+        ClientSensors clientSensors = new ClientSensors(emptyTags, 
quotaSensor, throttleTimeSensor);
+
+        assertTrue(clientSensors.metricTags().isEmpty());
+        assertEquals(quotaSensor, clientSensors.quotaSensor());
+        assertEquals(throttleTimeSensor, clientSensors.throttleTimeSensor());
+    }
+
+    @Test
+    void testConstructorThrowsExceptionWhenQuotaSensorIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ClientSensors(Map.of("client-id", "test-client"), null, 
mock(Sensor.class)));
+    }
+
+    @Test
+    void testConstructorThrowsExceptionWhenThrottleTimeSensorIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ClientSensors(Map.of("client-id", "test-client"), 
mock(Sensor.class), null));
+    }
+
+    @Test
+    void testConstructorThrowsExceptionWhenBothSensorsAreNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ClientSensors(Map.of("client-id", "test-client"), null, 
null));
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionKeyTest.java
 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionKeyTest.java
new file mode 100644
index 00000000000..e0dbc281a9d
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionKeyTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.session;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ShareSessionKeyTest {
+    @Test
+    public void testConstructorThrowsExceptionWhenGroupIdIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ShareSessionKey(null, Uuid.randomUuid()));
+    }
+
+    @Test
+    public void testConstructorThrowsExceptionWhenMemberIdIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ShareSessionKey("random", null));
+    }
+
+    @Test
+    public void 
testConstructorThrowsExceptionWhenBothGroupIdAndMemberIdIsNull() {
+        assertThrows(NullPointerException.class,
+            () -> new ShareSessionKey(null, null));
+    }
+}


Reply via email to