jeffkbkim commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1072528536


##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -619,6 +619,9 @@ object KafkaConfig {
   val PasswordEncoderKeyLengthProp =  "password.encoder.key.length"
   val PasswordEncoderIterationsProp =  "password.encoder.iterations"
 
+  /** Internal Configurations **/
+  val UnreleasedApisEnableProd = "unreleased.apis.enable"

Review Comment:
   should this be Prop?



##########
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##########
@@ -372,7 +375,10 @@ public enum Errors {
     FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered 
inconsistent topic ID usage", FetchSessionTopicIdException::new),
     INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible 
replica.", IneligibleReplicaException::new),
     NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated 
the partition state but the leader has changed.", 
NewLeaderElectedException::new),
-    OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to 
tiered storage.", OffsetMovedToTieredStorageException::new);
+    OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to 
tiered storage.", OffsetMovedToTieredStorageException::new),
+    FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group 
coordinator. The member must abandon all its partitions and rejoins.", 
FencedMemberEpochException::new),
+    UNRELEASED_INSTANCE_ID(111, "The instance ID is still used by another 
member in the consumer group. That member must leave first.", 
UnreleasedInstanceIdException::new),
+    UNSUPPORTED_ASSIGNOR(112, "The assignor used by the member or its version 
range are not supported by the consumer group.", 
UnsupportedAssignorException::new);

Review Comment:
   nit: is not
   how's "The selected assignor or its version range is not supported by the 
consumer group."?



##########
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala:
##########
@@ -68,14 +68,18 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
     } finally socket.close()
   }
 
-  def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, 
listenerName: ListenerName = cluster.clientListener()): Unit = {
+  def validateApiVersionsResponse(
+    apiVersionsResponse: ApiVersionsResponse,
+    listenerName: ListenerName = cluster.clientListener(),
+    shouldIncludeUnreleasedApi: Boolean = false

Review Comment:
   nit: Apis



##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##########
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - UNKNOWN_MEMBER_ID
+  // - FENCED_MEMBER_EPOCH
+  // - UNSUPPORTED_ASSIGNOR
+  // - UNRELEASED_INSTANCE_ID
+  // - GROUP_MAX_SIZE_REACHED

Review Comment:
   i noticed this is missing in the KIP. should we include it?



##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##########
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - UNKNOWN_MEMBER_ID
+  // - FENCED_MEMBER_EPOCH
+  // - UNSUPPORTED_ASSIGNOR
+  // - UNRELEASED_INSTANCE_ID
+  // - GROUP_MAX_SIZE_REACHED
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top-level error code, or 0 if there was no error" },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "The top-level error message, or null if there was no error." },
+    { "name": "MemberId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "The member id generated by the coordinator. Only provided when 
the member joins with MemberEpoch == 0." },
+    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
+      "about": "The member epoch." },
+    { "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+",
+      "about": "True if the member should compute the assignment for the 
group." },
+    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
+      "about": "The heartbeat interval in milliseconds." },
+    { "name": "Assignment", "type": "Assignment", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "null if not provided; the assignment otherwise.", "fields": [
+      { "name": "Error", "type": "int8", "versions": "0+",
+        "about": "The assigned error." },
+      { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", 
"versions": "0+",
+        "about": "The partitions assigned to the member that can be used 
immediately." },
+      { "name": "PendingTopicPartitions", "type": "[]TopicPartitions", 
"versions": "0+",
+        "about": "The partitions assigned to the member that cannot be used 
because they are not released by their former owners yet." },
+      { "name": "MetadataVersion", "type": "int16", "versions": "0+",
+        "about": "The version of the metadata." },
+      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
+        "about": "The assigned metadata." }
+    ]}
+  ],
+  "commonStructs": [

Review Comment:
   what's the difference between fields and commonStructs?



##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##########
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "request",
+  "apiStability": "evolving",
+  "listeners": ["zkBroker", "broker"],
+  "name": "ConsumerGroupHeartbeatRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+      "about": "The group identifier." },
+    { "name": "MemberId", "type": "string", "versions": "0+",
+      "about": "The member id generated by the coordinator. The member id must 
be kept during the entire lifetime of the member." },
+    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
+      "about": "The current member epoch; 0 to join the group; -1 to leave the 
group; -2 to indicate that the static member will rejoin." },
+    { "name": "InstanceId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "null if not provided or if it didn't change since the last 
heartbeat; the instance Id otherwise." },
+    { "name": "RackId", "type": "string", "versions": "0+",  
"nullableVersions": "0+", "default": "null",
+      "about": "null if not provided or if it didn't change since the last 
heartbeat; the rack ID of consumer otherwise." },
+    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
+      "about": "-1 if it didn't chance since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
+    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "null if it didn't change since the last heartbeat; the 
subscribed topic names otherwise." },
+    { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },
+    { "name": "ServerAssignor", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "null if not used or if it didn't change since the last 
heartbeat; the server side assignor to use otherwise." },
+    { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "null if not used or if it didn't change since the last 
heartbeat; the list of client-side assignors otherwise.", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+",
+        "about": "The name of the assignor." },
+      { "name": "MinimumVersion", "type": "int16", "versions": "0+",
+        "about": "The minimum supported version for the metadata." },
+      { "name": "MaximumVersion", "type": "int16", "versions": "0+",
+        "about": "The maximum supported version for the metadata." },
+      { "name": "Reason", "type": "int8", "versions": "0+",

Review Comment:
   can you provide an example of what we store in the byte?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to