cadonna commented on code in PR #19114:
URL: https://github.com/apache/kafka/pull/19114#discussion_r1983382584
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1348,6 +1381,34 @@ private void throwIfEmptyString(
}
}
+ /**
+ * Throws an InvalidRequestException if the value is null or non-empty.
+ *
+ * @param value The value.
+ * @param error The error message.
+ * @throws InvalidRequestException
+ */
+ private void throwIfNotEmptyCollection(
Review Comment:
All of those methods could be `static`. However, I am not sure if it's worth
changing them.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1480,6 +1600,34 @@ private boolean isSubset(
return true;
}
+ /**
+ * Verifies that the tasks currently owned by the member (the ones set in
the
+ * request) matches the ones that the member should own. It matches if the
streams
+ * only owns tasks which are in the assigned tasks. It does not match if
+ * it owns any other tasks.
+ *
+ * @param ownedTasks The tasks provided by the streams in the request.
Review Comment:
See my comment above about "streams".
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1480,6 +1600,34 @@ private boolean isSubset(
return true;
}
+ /**
+ * Verifies that the tasks currently owned by the member (the ones set in
the
+ * request) matches the ones that the member should own. It matches if the
streams
+ * only owns tasks which are in the assigned tasks. It does not match if
Review Comment:
Did you want to write "streams" or "Streams client" or "Streams member".
IMO, only "streams" does not really mean much.
Maybe "Streams member" is the better option since a Streams client can have
more stream threads and each of them would be a Streams member.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,280 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
+ * has been created or updated.
+ * 3) Determine the partition metadata and any internal topics that need
to be created.
+ * 4) Update the target assignment for the streams group if the group epoch
+ * is larger than the current target assignment epoch.
+ * 5) Reconcile the member's assignment with the target assignment.
+ *
+ * @param groupId The group id from the request.
Review Comment:
```suggestion
* @param groupId The group ID from the request.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -355,6 +355,32 @@ public StreamsGroupMember getOrMaybeCreateMember(
return new StreamsGroupMember.Builder(memberId).build();
}
+ /**
+ * Gets or creates a new member but without adding it to the group. Adding
a member is done via the
+ * {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
+ *
+ * @param memberId The member ID.
+ * @param createIfNotExists Booleans indicating whether the member must be
created if it does not exist.
+ * @return A StreamsGroupMember.
+ */
+ public StreamsGroupMember getOrMaybeCreateDefaultMember(
Review Comment:
See my other comment regarding `getOrMaybeCreateStreamsGroup()`. The main
issue that I see are those boolean flags that make the code often unnecessarily
harder to understand.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java:
##########
@@ -30,11 +34,11 @@
* An immutable tuple containing active, standby and warm-up tasks.
*
* @param activeTasks Active tasks.
- * The key of the map is the subtopology ID and
the value is the set of partition IDs.
+ * The key of the map is the subtopology ID, and
the value is the set of partition IDs.
Review Comment:
Why this comma?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1480,6 +1600,34 @@ private boolean isSubset(
return true;
}
+ /**
+ * Verifies that the tasks currently owned by the member (the ones set in
the
+ * request) matches the ones that the member should own. It matches if the
streams
+ * only owns tasks which are in the assigned tasks. It does not match if
+ * it owns any other tasks.
+ *
+ * @param ownedTasks The tasks provided by the streams in the request.
+ * @param target The tasks that the member should have.
+ *
+ * @return A boolean indicating whether the owned partitions are a subset
or not.
+ */
+ private boolean isTaskSubset(
+ List<StreamsGroupHeartbeatRequestData.TaskIds> ownedTasks,
+ Map<String, Set<Integer>> target
Review Comment:
IMO, a better name for this parameter is "assignedTasks"?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1480,6 +1600,34 @@ private boolean isSubset(
return true;
}
+ /**
+ * Verifies that the tasks currently owned by the member (the ones set in
the
+ * request) matches the ones that the member should own. It matches if the
streams
+ * only owns tasks which are in the assigned tasks. It does not match if
+ * it owns any other tasks.
+ *
+ * @param ownedTasks The tasks provided by the streams in the request.
+ * @param target The tasks that the member should have.
+ *
+ * @return A boolean indicating whether the owned partitions are a subset
or not.
+ */
+ private boolean isTaskSubset(
+ List<StreamsGroupHeartbeatRequestData.TaskIds> ownedTasks,
+ Map<String, Set<Integer>> target
+ ) {
+ if (ownedTasks == null) return false;
+
+ for (StreamsGroupHeartbeatRequestData.TaskIds topicPartitions :
ownedTasks) {
Review Comment:
`topicPartitions` -> `ownedTasksOfSubtopology`
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,280 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
+ * has been created or updated.
+ * 3) Determine the partition metadata and any internal topics that need
to be created.
+ * 4) Update the target assignment for the streams group if the group epoch
+ * is larger than the current target assignment epoch.
+ * 5) Reconcile the member's assignment with the target assignment.
+ *
+ * @param groupId The group id from the request.
+ * @param memberId The member ID from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The instance ID from the request or null.
+ * @param rackId The rack ID from the request or null.
+ * @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
+ * @param clientId The client ID.
+ * @param clientHost The client host.
+ * @param topology The topology from the request or null.
+ * @param ownedActiveTasks The list of owned active tasks from the
request or null.
+ * @param ownedStandbyTasks The list of owned standby tasks from the
request or null.
+ * @param ownedWarmupTasks The list of owned warmup tasks from the
request or null.
+ * @param userEndpoint User-defined endpoint for Interactive
Queries, or null.
+ * @param clientTags Used for rack-aware assignment algorithm, or
null.
+ * @param taskEndOffsets Cumulative changelog offsets for tasks, or
null.
+ * @param taskOffsets Cumulative changelog end-offsets for tasks,
or null.
+ * @param shutdownApplication Whether all Streams clients in the group
should shut down.
+ * @return A Result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
+ */
+ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ String instanceId,
+ String rackId,
+ int rebalanceTimeoutMs,
+ String clientId,
+ String clientHost,
+ final StreamsGroupHeartbeatRequestData.Topology topology,
Review Comment:
Why are some parameters `final` and others not?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,280 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
+ * has been created or updated.
+ * 3) Determine the partition metadata and any internal topics that need
to be created.
+ * 4) Update the target assignment for the streams group if the group epoch
+ * is larger than the current target assignment epoch.
+ * 5) Reconcile the member's assignment with the target assignment.
+ *
+ * @param groupId The group id from the request.
+ * @param memberId The member ID from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The instance ID from the request or null.
+ * @param rackId The rack ID from the request or null.
+ * @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
+ * @param clientId The client ID.
+ * @param clientHost The client host.
+ * @param topology The topology from the request or null.
+ * @param ownedActiveTasks The list of owned active tasks from the
request or null.
+ * @param ownedStandbyTasks The list of owned standby tasks from the
request or null.
+ * @param ownedWarmupTasks The list of owned warmup tasks from the
request or null.
+ * @param userEndpoint User-defined endpoint for Interactive
Queries, or null.
+ * @param clientTags Used for rack-aware assignment algorithm, or
null.
+ * @param taskEndOffsets Cumulative changelog offsets for tasks, or
null.
+ * @param taskOffsets Cumulative changelog end-offsets for tasks,
or null.
+ * @param shutdownApplication Whether all Streams clients in the group
should shut down.
+ * @return A Result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
+ */
+ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ String instanceId,
+ String rackId,
+ int rebalanceTimeoutMs,
+ String clientId,
+ String clientHost,
+ final StreamsGroupHeartbeatRequestData.Topology topology,
+ List<TaskIds> ownedActiveTasks,
+ List<TaskIds> ownedStandbyTasks,
+ List<TaskIds> ownedWarmupTasks,
+ final String processId,
+ final Endpoint userEndpoint,
+ final List<KeyValue> clientTags,
+ final List<TaskOffset> taskOffsets,
+ final List<TaskOffset> taskEndOffsets,
+ final boolean shutdownApplication
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<CoordinatorRecord> records = new ArrayList<>();
+ final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus =
new ArrayList<>();
+
+ // Get or create the streams group.
+ boolean createIfNotExists = memberEpoch == 0;
+ final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId,
createIfNotExists);
+
+ // Get or create the member.
+ StreamsGroupMember member;
+ if (instanceId == null) {
+ member = getOrMaybeSubscribeDynamicStreamsGroupMember(
Review Comment:
Why `Subscribe`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,280 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
+ * has been created or updated.
+ * 3) Determine the partition metadata and any internal topics that need
to be created.
+ * 4) Update the target assignment for the streams group if the group epoch
+ * is larger than the current target assignment epoch.
+ * 5) Reconcile the member's assignment with the target assignment.
+ *
+ * @param groupId The group id from the request.
+ * @param memberId The member ID from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The instance ID from the request or null.
+ * @param rackId The rack ID from the request or null.
+ * @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
+ * @param clientId The client ID.
+ * @param clientHost The client host.
+ * @param topology The topology from the request or null.
+ * @param ownedActiveTasks The list of owned active tasks from the
request or null.
+ * @param ownedStandbyTasks The list of owned standby tasks from the
request or null.
+ * @param ownedWarmupTasks The list of owned warmup tasks from the
request or null.
+ * @param userEndpoint User-defined endpoint for Interactive
Queries, or null.
+ * @param clientTags Used for rack-aware assignment algorithm, or
null.
+ * @param taskEndOffsets Cumulative changelog offsets for tasks, or
null.
+ * @param taskOffsets Cumulative changelog end-offsets for tasks,
or null.
+ * @param shutdownApplication Whether all Streams clients in the group
should shut down.
+ * @return A Result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
Review Comment:
```suggestion
* @return A result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,280 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
Review Comment:
```suggestion
* 2) Initialize or update the topology. The group epoch is bumped if
the topology
```
What do you think about adding a newline after the first sentence as you did
in 1)?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,280 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
Review Comment:
```suggestion
* 1) Create or update the member.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,277 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
+ * has been created or updated.
+ * 3) Determine the partition metadata and any internal topics that need
to be created.
+ * 4) Update the target assignment for the streams group if the group epoch
+ * is larger than the current target assignment epoch.
+ * 5) Reconcile the member's assignment with the target assignment.
+ *
+ * @param groupId The group id from the request.
+ * @param memberId The member ID from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The instance ID from the request or null.
+ * @param rackId The rack ID from the request or null.
+ * @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
+ * @param clientId The client ID.
+ * @param clientHost The client host.
+ * @param topology The topology from the request or null.
+ * @param ownedActiveTasks The list of owned active tasks from the
request or null.
+ * @param ownedStandbyTasks The list of owned standby tasks from the
request or null.
+ * @param ownedWarmupTasks The list of owned warmup tasks from the
request or null.
+ * @param userEndpoint User-defined endpoint for Interactive
Queries, or null.
+ * @param clientTags Used for rack-aware assignment algorithm, or
null.
+ * @param taskEndOffsets Cumulative changelog offsets for tasks, or
null.
+ * @param taskOffsets Cumulative changelog end-offsets for tasks,
or null.
+ * @param shutdownApplication Whether all Streams clients in the group
should shut down.
+ * @return A Result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
+ */
+ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ String instanceId,
+ String rackId,
+ int rebalanceTimeoutMs,
+ String clientId,
+ String clientHost,
+ final StreamsGroupHeartbeatRequestData.Topology topology,
+ List<TaskIds> ownedActiveTasks,
+ List<TaskIds> ownedStandbyTasks,
+ List<TaskIds> ownedWarmupTasks,
+ final String processId,
+ final Endpoint userEndpoint,
+ final List<KeyValue> clientTags,
+ final List<TaskOffset> taskOffsets,
+ final List<TaskOffset> taskEndOffsets,
+ final boolean shutdownApplication
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<CoordinatorRecord> records = new ArrayList<>();
+ final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus =
new ArrayList<>();
+
+ // Get or create the streams group.
+ boolean createIfNotExists = memberEpoch == 0;
+ final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId,
createIfNotExists);
+
+ // Get or create the member.
+ StreamsGroupMember member;
+ if (instanceId == null) {
+ member = getOrMaybeSubscribeDynamicStreamsGroupMember(
+ group,
+ memberId,
+ memberEpoch,
+ ownedActiveTasks,
+ ownedStandbyTasks,
+ ownedWarmupTasks,
+ createIfNotExists
+ );
+ } else {
+ throw new UnsupportedOperationException("Static members are not
supported yet.");
+ }
+
+ // 1. Create or update the member.
+ // If the member is new or has changed, a StreamsMemberMetadataValue
record is written to the __consumer_offsets partition to
+ // persist the change. If the subscriptions have changed, the
subscription metadata is updated and persisted by writing a
+ // StreamsPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a StreamsMetadataValue record to
the partition.
+ StreamsGroupMember updatedMember = new
StreamsGroupMember.Builder(member)
+ .maybeUpdateInstanceId(Optional.empty())
+ .maybeUpdateRackId(Optional.ofNullable(rackId))
+ .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
+ .maybeUpdateTopologyEpoch(topology != null ?
OptionalInt.of(topology.epoch()) : OptionalInt.empty())
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .maybeUpdateProcessId(Optional.ofNullable(processId))
+ .maybeUpdateClientTags(Optional.ofNullable(clientTags).map(x ->
x.stream().collect(Collectors.toMap(KeyValue::key, KeyValue::value))))
+ .maybeUpdateUserEndpoint(Optional.ofNullable(userEndpoint).map(x
-> new
StreamsGroupMemberMetadataValue.Endpoint().setHost(x.host()).setPort(x.port())))
+ .build();
+
+ // If the group is newly created, we must ensure that it moves away
from
+ // epoch 0 and that it is fully initialized.
+ int groupEpoch = group.groupEpoch();
+ boolean bumpGroupEpoch = false;
+
+ bumpGroupEpoch |= hasStreamsMemberMetadataChanged(groupId, member,
updatedMember, records);
+
+ // 2. Initialize/Update the group topology.
+ // If the member is new or has changed, a StreamsGroupTopologyValue
record is written to the __consumer_offsets partition to persist
+ // the change. The group epoch is bumped if the topology has changed.
+ StreamsTopology updatedTopology;
+ boolean reconfigureTopology = false;
+ if (topology != null) {
+ StreamsGroupTopologyValue recordValue =
convertToStreamsGroupTopologyRecord(topology);
+
+ updatedTopology = StreamsTopology.fromRequest(topology);
+
+ if (group.topology().isEmpty()) {
+ log.info("[GroupId {}][MemberId {}] Member initialized the
topology with epoch {}", groupId, memberId, topology.epoch());
+
+ records.add(newStreamsGroupTopologyRecord(groupId,
recordValue));
+
+ reconfigureTopology = true;
+ bumpGroupEpoch = true;
+ } else if (!updatedTopology.equals(group.topology().get())) {
+ throw new InvalidRequestException("Topology updates are not
supported yet.");
+ }
+ } else if (group.topology().isPresent()) {
+ updatedTopology = group.topology().get();
+ } else {
+ throw new IllegalStateException("The topology is null and the
group topology is also null.");
+ }
+
+ if (group.topology().isPresent() && updatedMember.topologyEpoch() <
group.topology().get().topologyEpoch()) {
+ returnedStatus.add(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.STALE_TOPOLOGY.code())
+ .setStatusDetail(
+ String.format(
+ "The member's topology epoch %d is behind the
group's topology epoch %d.",
+ updatedMember.topologyEpoch(),
+ group.topology().get().topologyEpoch()
+ )
+ )
+ );
+ }
+
+ // 3. Determine the partition metadata and any internal topics if
needed.
+ ConfiguredTopology updatedConfiguredTopology;
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
updatedPartitionMetadata;
+ if (reconfigureTopology || group.hasMetadataExpired(currentTimeMs)) {
+
+ // The partition metadata is updated when the refresh deadline has
been reached.
Review Comment:
Is this comment really needed?
IMO it is even not accurate since the partition metadata is also updated if
the topology needs to be reconfigured.
All of this is clear from the condition in the `if`-clause.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,280 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
+ * has been created or updated.
+ * 3) Determine the partition metadata and any internal topics that need
to be created.
+ * 4) Update the target assignment for the streams group if the group epoch
+ * is larger than the current target assignment epoch.
+ * 5) Reconcile the member's assignment with the target assignment.
+ *
+ * @param groupId The group id from the request.
+ * @param memberId The member ID from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The instance ID from the request or null.
+ * @param rackId The rack ID from the request or null.
+ * @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
+ * @param clientId The client ID.
+ * @param clientHost The client host.
+ * @param topology The topology from the request or null.
+ * @param ownedActiveTasks The list of owned active tasks from the
request or null.
+ * @param ownedStandbyTasks The list of owned standby tasks from the
request or null.
+ * @param ownedWarmupTasks The list of owned warmup tasks from the
request or null.
+ * @param userEndpoint User-defined endpoint for Interactive
Queries, or null.
+ * @param clientTags Used for rack-aware assignment algorithm, or
null.
+ * @param taskEndOffsets Cumulative changelog offsets for tasks, or
null.
+ * @param taskOffsets Cumulative changelog end-offsets for tasks,
or null.
+ * @param shutdownApplication Whether all Streams clients in the group
should shut down.
+ * @return A Result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
+ */
+ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ String instanceId,
+ String rackId,
+ int rebalanceTimeoutMs,
+ String clientId,
+ String clientHost,
+ final StreamsGroupHeartbeatRequestData.Topology topology,
+ List<TaskIds> ownedActiveTasks,
+ List<TaskIds> ownedStandbyTasks,
+ List<TaskIds> ownedWarmupTasks,
+ final String processId,
+ final Endpoint userEndpoint,
+ final List<KeyValue> clientTags,
+ final List<TaskOffset> taskOffsets,
+ final List<TaskOffset> taskEndOffsets,
+ final boolean shutdownApplication
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<CoordinatorRecord> records = new ArrayList<>();
+ final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus =
new ArrayList<>();
+
+ // Get or create the streams group.
+ boolean createIfNotExists = memberEpoch == 0;
+ final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId,
createIfNotExists);
+
+ // Get or create the member.
+ StreamsGroupMember member;
+ if (instanceId == null) {
+ member = getOrMaybeSubscribeDynamicStreamsGroupMember(
+ group,
+ memberId,
+ memberEpoch,
+ ownedActiveTasks,
+ ownedStandbyTasks,
+ ownedWarmupTasks,
+ createIfNotExists
+ );
+ } else {
+ throw new UnsupportedOperationException("Static members are not
supported yet.");
+ }
+
+ // 1. Create or update the member.
+ // If the member is new or has changed, a StreamsMemberMetadataValue
record is written to the __consumer_offsets partition to
+ // persist the change. If the subscriptions have changed, the
subscription metadata is updated and persisted by writing a
+ // StreamsPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a StreamsMetadataValue record to
the partition.
Review Comment:
This comment does not really match the code. It describes actions that are
not done in step 1. For example, the subscriptions are checked in step 3. Also
the names of the records are not correct, for example, we renamed
`StreamsPartitionMetadataValue` to `StreamsGroupPartitionMetadataValue` quite
some time ago.
Is this maybe a case of lying inline comments? And are those inline comments
lying because they are hard to maintain? 🙂
In general, this method is quite long. I am wondering whether it would be
better to extract each step into its own method (with its own javadocs).
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,280 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
+ * has been created or updated.
+ * 3) Determine the partition metadata and any internal topics that need
to be created.
+ * 4) Update the target assignment for the streams group if the group epoch
+ * is larger than the current target assignment epoch.
+ * 5) Reconcile the member's assignment with the target assignment.
+ *
+ * @param groupId The group id from the request.
+ * @param memberId The member ID from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The instance ID from the request or null.
+ * @param rackId The rack ID from the request or null.
+ * @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
+ * @param clientId The client ID.
+ * @param clientHost The client host.
+ * @param topology The topology from the request or null.
+ * @param ownedActiveTasks The list of owned active tasks from the
request or null.
+ * @param ownedStandbyTasks The list of owned standby tasks from the
request or null.
+ * @param ownedWarmupTasks The list of owned warmup tasks from the
request or null.
+ * @param userEndpoint User-defined endpoint for Interactive
Queries, or null.
+ * @param clientTags Used for rack-aware assignment algorithm, or
null.
+ * @param taskEndOffsets Cumulative changelog offsets for tasks, or
null.
+ * @param taskOffsets Cumulative changelog end-offsets for tasks,
or null.
+ * @param shutdownApplication Whether all Streams clients in the group
should shut down.
+ * @return A Result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
+ */
+ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ String instanceId,
+ String rackId,
+ int rebalanceTimeoutMs,
+ String clientId,
+ String clientHost,
+ final StreamsGroupHeartbeatRequestData.Topology topology,
+ List<TaskIds> ownedActiveTasks,
+ List<TaskIds> ownedStandbyTasks,
+ List<TaskIds> ownedWarmupTasks,
+ final String processId,
+ final Endpoint userEndpoint,
+ final List<KeyValue> clientTags,
+ final List<TaskOffset> taskOffsets,
+ final List<TaskOffset> taskEndOffsets,
+ final boolean shutdownApplication
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<CoordinatorRecord> records = new ArrayList<>();
+ final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus =
new ArrayList<>();
+
+ // Get or create the streams group.
+ boolean createIfNotExists = memberEpoch == 0;
+ final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId,
createIfNotExists);
Review Comment:
Not directly related to the changes in this PR: I still do not like this
method. What do you think about the following:
```java
StreamsGroup getOrCreateStreamsGroup(
String groupId
) throws GroupIdNotFoundException {
Group group = groups.get(groupId);
if (group == null) {
return new StreamsGroup(logContext, snapshotRegistry, groupId,
metrics);
} else {
return castToStreamsGroup(group);
}
}
StreamsGroup getStreamsGroupOrThrow(
String groupId
) throws GroupIdNotFoundException {
Group group = groups.get(groupId);
if (group == null) {
throw new GroupIdNotFoundException(String.format("Streams group
%s not found.", groupId));
} else {
return castToStreamsGroup(group);
}
}
private StreamsGroup castToStreamsGroup(final Group group) {
if (group.type() == STREAMS) {
return (StreamsGroup) group;
} else {
throw new GroupIdNotFoundException(String.format("Group %s is
not a streams group.", group.groupId()));
}
}
```
I think splitting this method would make the code better readable.
```java
final StreamsGroup group = memberEpoch == 0 ?
getOrCreateStreamsGroup(groupId) : getStreamsGroupOrThrow(groupId);
```
Or with a conventional `if`-clause if you prefer.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,277 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
+ * has been created or updated.
+ * 3) Determine the partition metadata and any internal topics that need
to be created.
+ * 4) Update the target assignment for the streams group if the group epoch
+ * is larger than the current target assignment epoch.
+ * 5) Reconcile the member's assignment with the target assignment.
+ *
+ * @param groupId The group id from the request.
+ * @param memberId The member ID from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The instance ID from the request or null.
+ * @param rackId The rack ID from the request or null.
+ * @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
+ * @param clientId The client ID.
+ * @param clientHost The client host.
+ * @param topology The topology from the request or null.
+ * @param ownedActiveTasks The list of owned active tasks from the
request or null.
+ * @param ownedStandbyTasks The list of owned standby tasks from the
request or null.
+ * @param ownedWarmupTasks The list of owned warmup tasks from the
request or null.
+ * @param userEndpoint User-defined endpoint for Interactive
Queries, or null.
+ * @param clientTags Used for rack-aware assignment algorithm, or
null.
+ * @param taskEndOffsets Cumulative changelog offsets for tasks, or
null.
+ * @param taskOffsets Cumulative changelog end-offsets for tasks,
or null.
+ * @param shutdownApplication Whether all Streams clients in the group
should shut down.
+ * @return A Result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
+ */
+ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ String instanceId,
+ String rackId,
+ int rebalanceTimeoutMs,
+ String clientId,
+ String clientHost,
+ final StreamsGroupHeartbeatRequestData.Topology topology,
+ List<TaskIds> ownedActiveTasks,
+ List<TaskIds> ownedStandbyTasks,
+ List<TaskIds> ownedWarmupTasks,
+ final String processId,
+ final Endpoint userEndpoint,
+ final List<KeyValue> clientTags,
+ final List<TaskOffset> taskOffsets,
+ final List<TaskOffset> taskEndOffsets,
+ final boolean shutdownApplication
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<CoordinatorRecord> records = new ArrayList<>();
+ final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus =
new ArrayList<>();
+
+ // Get or create the streams group.
+ boolean createIfNotExists = memberEpoch == 0;
+ final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId,
createIfNotExists);
+
+ // Get or create the member.
+ StreamsGroupMember member;
+ if (instanceId == null) {
+ member = getOrMaybeSubscribeDynamicStreamsGroupMember(
+ group,
+ memberId,
+ memberEpoch,
+ ownedActiveTasks,
+ ownedStandbyTasks,
+ ownedWarmupTasks,
+ createIfNotExists
+ );
+ } else {
+ throw new UnsupportedOperationException("Static members are not
supported yet.");
+ }
+
+ // 1. Create or update the member.
+ // If the member is new or has changed, a StreamsMemberMetadataValue
record is written to the __consumer_offsets partition to
+ // persist the change. If the subscriptions have changed, the
subscription metadata is updated and persisted by writing a
+ // StreamsPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a StreamsMetadataValue record to
the partition.
+ StreamsGroupMember updatedMember = new
StreamsGroupMember.Builder(member)
+ .maybeUpdateInstanceId(Optional.empty())
+ .maybeUpdateRackId(Optional.ofNullable(rackId))
+ .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
+ .maybeUpdateTopologyEpoch(topology != null ?
OptionalInt.of(topology.epoch()) : OptionalInt.empty())
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .maybeUpdateProcessId(Optional.ofNullable(processId))
+ .maybeUpdateClientTags(Optional.ofNullable(clientTags).map(x ->
x.stream().collect(Collectors.toMap(KeyValue::key, KeyValue::value))))
+ .maybeUpdateUserEndpoint(Optional.ofNullable(userEndpoint).map(x
-> new
StreamsGroupMemberMetadataValue.Endpoint().setHost(x.host()).setPort(x.port())))
+ .build();
+
+ // If the group is newly created, we must ensure that it moves away
from
+ // epoch 0 and that it is fully initialized.
+ int groupEpoch = group.groupEpoch();
+ boolean bumpGroupEpoch = false;
+
+ bumpGroupEpoch |= hasStreamsMemberMetadataChanged(groupId, member,
updatedMember, records);
+
+ // 2. Initialize/Update the group topology.
+ // If the member is new or has changed, a StreamsGroupTopologyValue
record is written to the __consumer_offsets partition to persist
+ // the change. The group epoch is bumped if the topology has changed.
+ StreamsTopology updatedTopology;
+ boolean reconfigureTopology = false;
+ if (topology != null) {
+ StreamsGroupTopologyValue recordValue =
convertToStreamsGroupTopologyRecord(topology);
+
+ updatedTopology = StreamsTopology.fromRequest(topology);
Review Comment:
Should we rename `fromRequest()` to `fromHeartbeatRequest()` or rename
`fromHeartbeatRequest()` in `TasksTuple` to `fromRequest()` to be consistent?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,277 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
+ * has been created or updated.
+ * 3) Determine the partition metadata and any internal topics that need
to be created.
+ * 4) Update the target assignment for the streams group if the group epoch
+ * is larger than the current target assignment epoch.
+ * 5) Reconcile the member's assignment with the target assignment.
+ *
+ * @param groupId The group id from the request.
+ * @param memberId The member ID from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The instance ID from the request or null.
+ * @param rackId The rack ID from the request or null.
+ * @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
+ * @param clientId The client ID.
+ * @param clientHost The client host.
+ * @param topology The topology from the request or null.
+ * @param ownedActiveTasks The list of owned active tasks from the
request or null.
+ * @param ownedStandbyTasks The list of owned standby tasks from the
request or null.
+ * @param ownedWarmupTasks The list of owned warmup tasks from the
request or null.
+ * @param userEndpoint User-defined endpoint for Interactive
Queries, or null.
+ * @param clientTags Used for rack-aware assignment algorithm, or
null.
+ * @param taskEndOffsets Cumulative changelog offsets for tasks, or
null.
+ * @param taskOffsets Cumulative changelog end-offsets for tasks,
or null.
+ * @param shutdownApplication Whether all Streams clients in the group
should shut down.
+ * @return A Result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
+ */
+ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ String instanceId,
+ String rackId,
+ int rebalanceTimeoutMs,
+ String clientId,
+ String clientHost,
+ final StreamsGroupHeartbeatRequestData.Topology topology,
+ List<TaskIds> ownedActiveTasks,
+ List<TaskIds> ownedStandbyTasks,
+ List<TaskIds> ownedWarmupTasks,
+ final String processId,
+ final Endpoint userEndpoint,
+ final List<KeyValue> clientTags,
+ final List<TaskOffset> taskOffsets,
+ final List<TaskOffset> taskEndOffsets,
+ final boolean shutdownApplication
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<CoordinatorRecord> records = new ArrayList<>();
+ final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus =
new ArrayList<>();
+
+ // Get or create the streams group.
+ boolean createIfNotExists = memberEpoch == 0;
+ final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId,
createIfNotExists);
+
+ // Get or create the member.
+ StreamsGroupMember member;
+ if (instanceId == null) {
+ member = getOrMaybeSubscribeDynamicStreamsGroupMember(
+ group,
+ memberId,
+ memberEpoch,
+ ownedActiveTasks,
+ ownedStandbyTasks,
+ ownedWarmupTasks,
+ createIfNotExists
+ );
+ } else {
+ throw new UnsupportedOperationException("Static members are not
supported yet.");
+ }
+
+ // 1. Create or update the member.
+ // If the member is new or has changed, a StreamsMemberMetadataValue
record is written to the __consumer_offsets partition to
+ // persist the change. If the subscriptions have changed, the
subscription metadata is updated and persisted by writing a
+ // StreamsPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a StreamsMetadataValue record to
the partition.
+ StreamsGroupMember updatedMember = new
StreamsGroupMember.Builder(member)
+ .maybeUpdateInstanceId(Optional.empty())
+ .maybeUpdateRackId(Optional.ofNullable(rackId))
+ .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
+ .maybeUpdateTopologyEpoch(topology != null ?
OptionalInt.of(topology.epoch()) : OptionalInt.empty())
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .maybeUpdateProcessId(Optional.ofNullable(processId))
+ .maybeUpdateClientTags(Optional.ofNullable(clientTags).map(x ->
x.stream().collect(Collectors.toMap(KeyValue::key, KeyValue::value))))
+ .maybeUpdateUserEndpoint(Optional.ofNullable(userEndpoint).map(x
-> new
StreamsGroupMemberMetadataValue.Endpoint().setHost(x.host()).setPort(x.port())))
+ .build();
+
+ // If the group is newly created, we must ensure that it moves away
from
+ // epoch 0 and that it is fully initialized.
+ int groupEpoch = group.groupEpoch();
+ boolean bumpGroupEpoch = false;
+
+ bumpGroupEpoch |= hasStreamsMemberMetadataChanged(groupId, member,
updatedMember, records);
+
+ // 2. Initialize/Update the group topology.
+ // If the member is new or has changed, a StreamsGroupTopologyValue
record is written to the __consumer_offsets partition to persist
+ // the change. The group epoch is bumped if the topology has changed.
+ StreamsTopology updatedTopology;
+ boolean reconfigureTopology = false;
+ if (topology != null) {
+ StreamsGroupTopologyValue recordValue =
convertToStreamsGroupTopologyRecord(topology);
+
+ updatedTopology = StreamsTopology.fromRequest(topology);
+
+ if (group.topology().isEmpty()) {
+ log.info("[GroupId {}][MemberId {}] Member initialized the
topology with epoch {}", groupId, memberId, topology.epoch());
+
+ records.add(newStreamsGroupTopologyRecord(groupId,
recordValue));
+
+ reconfigureTopology = true;
+ bumpGroupEpoch = true;
+ } else if (!updatedTopology.equals(group.topology().get())) {
+ throw new InvalidRequestException("Topology updates are not
supported yet.");
+ }
+ } else if (group.topology().isPresent()) {
+ updatedTopology = group.topology().get();
+ } else {
+ throw new IllegalStateException("The topology is null and the
group topology is also null.");
+ }
+
+ if (group.topology().isPresent() && updatedMember.topologyEpoch() <
group.topology().get().topologyEpoch()) {
+ returnedStatus.add(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.STALE_TOPOLOGY.code())
+ .setStatusDetail(
+ String.format(
+ "The member's topology epoch %d is behind the
group's topology epoch %d.",
+ updatedMember.topologyEpoch(),
+ group.topology().get().topologyEpoch()
+ )
+ )
+ );
+ }
+
+ // 3. Determine the partition metadata and any internal topics if
needed.
+ ConfiguredTopology updatedConfiguredTopology;
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
updatedPartitionMetadata;
+ if (reconfigureTopology || group.hasMetadataExpired(currentTimeMs)) {
+
+ // The partition metadata is updated when the refresh deadline has
been reached.
+ updatedPartitionMetadata = group.computePartitionMetadata(
+ metadataImage.topics(),
+ updatedTopology
+ );
+
+ if (!updatedPartitionMetadata.equals(group.partitionMetadata())) {
+ log.info("[GroupId {}][MemberId {}] Computed new partition
metadata: {}.",
+ groupId, memberId, updatedPartitionMetadata);
+ bumpGroupEpoch = true;
+ reconfigureTopology = true;
+ records.add(newStreamsGroupPartitionMetadataRecord(groupId,
updatedPartitionMetadata));
+ group.setPartitionMetadata(updatedPartitionMetadata);
+ }
+
+ if (reconfigureTopology || group.configuredTopology().isEmpty()) {
+ log.info("[GroupId {}][MemberId {}] Configuring the topology
{}", groupId, memberId, updatedTopology);
+ updatedConfiguredTopology =
InternalTopicManager.configureTopics(logContext, updatedTopology,
updatedPartitionMetadata);
+ } else {
+ updatedConfiguredTopology = group.configuredTopology().get();
+ }
+ } else {
+ updatedConfiguredTopology = group.configuredTopology().get();
+ updatedPartitionMetadata = group.partitionMetadata();
+ }
+
+ // Actually bump the group epoch
+ if (bumpGroupEpoch) {
+ groupEpoch += 1;
+ records.add(newStreamsGroupEpochRecord(groupId, groupEpoch));
+ log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{}.", groupId, memberId, groupEpoch);
+ metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
+ group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
+ }
+
+ // 4. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
+ // replaces an existing static member.
+ // The delta between the existing and the new target assignment is
persisted to the partition.
+ int targetAssignmentEpoch;
+ TasksTuple targetAssignment;
+ if (groupEpoch > group.assignmentEpoch()) {
+ targetAssignment = updateStreamsTargetAssignment(
+ group,
+ groupEpoch,
+ updatedMember,
+ updatedConfiguredTopology,
+ updatedPartitionMetadata,
+ records
+ );
+ targetAssignmentEpoch = groupEpoch;
+ } else {
+ targetAssignmentEpoch = group.assignmentEpoch();
+ targetAssignment =
group.targetAssignment(updatedMember.memberId());
+ }
+
+ // 5. Reconcile the member's assignment with the target assignment if
the member is not
+ // fully reconciled yet.
+ updatedMember = maybeReconcile(
+ groupId,
+ updatedMember,
+ group::currentActiveTaskProcessId,
+ group::currentStandbyTaskProcessIds,
+ group::currentWarmupTaskProcessIds,
+ targetAssignmentEpoch,
+ targetAssignment,
+ ownedActiveTasks,
+ ownedStandbyTasks,
+ ownedWarmupTasks,
+ records
+ );
+
+ scheduleStreamsGroupSessionTimeout(groupId, memberId);
+
+ // Prepare the response.
+ StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
+ .setMemberId(updatedMember.memberId())
+ .setMemberEpoch(updatedMember.memberEpoch())
+ .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+
+ // The assignment is only provided in the following cases:
+ // 1. The member sent a full request.
+ // It does so when joining or rejoining the group with zero
+ // as the member epoch; or on any errors (e.g., timeout).
+ // We use all the non-optional fields to detect a full request as
those must be set in a full request.
+ // 2. The member's assignment has been updated.
+ boolean isFullRequest =
+ rebalanceTimeoutMs != -1
+ && ownedActiveTasks != null
+ && ownedStandbyTasks != null
+ && ownedWarmupTasks != null
+ && clientTags != null
+ && processId != null;
Review Comment:
Are there errors for which the member epoch would not be reset to 0?
I am asking because if there are not such errors, we could just check the
member epoch, right?
Alternatively, would checking for the existence of the topology also
guarantee a full request?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java:
##########
@@ -30,11 +34,11 @@
* An immutable tuple containing active, standby and warm-up tasks.
*
* @param activeTasks Active tasks.
- * The key of the map is the subtopology ID and
the value is the set of partition IDs.
+ * The key of the map is the subtopology ID, and
the value is the set of partition IDs.
Review Comment:
Why this comma?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1480,6 +1600,34 @@ private boolean isSubset(
return true;
}
+ /**
+ * Verifies that the tasks currently owned by the member (the ones set in
the
+ * request) matches the ones that the member should own. It matches if the
streams
+ * only owns tasks which are in the assigned tasks. It does not match if
+ * it owns any other tasks.
+ *
+ * @param ownedTasks The tasks provided by the streams in the request.
+ * @param target The tasks that the member should have.
+ *
+ * @return A boolean indicating whether the owned partitions are a subset
or not.
+ */
+ private boolean isTaskSubset(
Review Comment:
A better name would be `areOwnedTasksContainedInAssignedTasks()`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,280 @@ private
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
.toList();
}
+ /**
+ * Handles a regular heartbeat from a Streams group member.
+ * It mainly consists of five parts:
+ * 1) Created or update the member.
+ * The group epoch is bumped if the member has been created or updated.
+ * 2) Initialized or update the topology. The group epoch is bumped if the
topology
+ * has been created or updated.
+ * 3) Determine the partition metadata and any internal topics that need
to be created.
+ * 4) Update the target assignment for the streams group if the group epoch
+ * is larger than the current target assignment epoch.
+ * 5) Reconcile the member's assignment with the target assignment.
+ *
+ * @param groupId The group id from the request.
+ * @param memberId The member ID from the request.
+ * @param memberEpoch The member epoch from the request.
+ * @param instanceId The instance ID from the request or null.
+ * @param rackId The rack ID from the request or null.
+ * @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
+ * @param clientId The client ID.
+ * @param clientHost The client host.
+ * @param topology The topology from the request or null.
+ * @param ownedActiveTasks The list of owned active tasks from the
request or null.
+ * @param ownedStandbyTasks The list of owned standby tasks from the
request or null.
+ * @param ownedWarmupTasks The list of owned warmup tasks from the
request or null.
+ * @param userEndpoint User-defined endpoint for Interactive
Queries, or null.
+ * @param clientTags Used for rack-aware assignment algorithm, or
null.
+ * @param taskEndOffsets Cumulative changelog offsets for tasks, or
null.
+ * @param taskOffsets Cumulative changelog end-offsets for tasks,
or null.
+ * @param shutdownApplication Whether all Streams clients in the group
should shut down.
+ * @return A Result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
+ */
+ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ String instanceId,
+ String rackId,
+ int rebalanceTimeoutMs,
+ String clientId,
+ String clientHost,
+ final StreamsGroupHeartbeatRequestData.Topology topology,
+ List<TaskIds> ownedActiveTasks,
+ List<TaskIds> ownedStandbyTasks,
+ List<TaskIds> ownedWarmupTasks,
+ final String processId,
+ final Endpoint userEndpoint,
+ final List<KeyValue> clientTags,
+ final List<TaskOffset> taskOffsets,
+ final List<TaskOffset> taskEndOffsets,
+ final boolean shutdownApplication
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<CoordinatorRecord> records = new ArrayList<>();
+ final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus =
new ArrayList<>();
+
+ // Get or create the streams group.
+ boolean createIfNotExists = memberEpoch == 0;
+ final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId,
createIfNotExists);
+
+ // Get or create the member.
+ StreamsGroupMember member;
+ if (instanceId == null) {
+ member = getOrMaybeSubscribeDynamicStreamsGroupMember(
+ group,
+ memberId,
+ memberEpoch,
+ ownedActiveTasks,
+ ownedStandbyTasks,
+ ownedWarmupTasks,
+ createIfNotExists
+ );
+ } else {
+ throw new UnsupportedOperationException("Static members are not
supported yet.");
+ }
+
+ // 1. Create or update the member.
+ // If the member is new or has changed, a StreamsMemberMetadataValue
record is written to the __consumer_offsets partition to
+ // persist the change. If the subscriptions have changed, the
subscription metadata is updated and persisted by writing a
+ // StreamsPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a StreamsMetadataValue record to
the partition.
+ StreamsGroupMember updatedMember = new
StreamsGroupMember.Builder(member)
+ .maybeUpdateInstanceId(Optional.empty())
+ .maybeUpdateRackId(Optional.ofNullable(rackId))
+ .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
+ .maybeUpdateTopologyEpoch(topology != null ?
OptionalInt.of(topology.epoch()) : OptionalInt.empty())
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .maybeUpdateProcessId(Optional.ofNullable(processId))
+ .maybeUpdateClientTags(Optional.ofNullable(clientTags).map(x ->
x.stream().collect(Collectors.toMap(KeyValue::key, KeyValue::value))))
+ .maybeUpdateUserEndpoint(Optional.ofNullable(userEndpoint).map(x
-> new
StreamsGroupMemberMetadataValue.Endpoint().setHost(x.host()).setPort(x.port())))
+ .build();
+
+ // If the group is newly created, we must ensure that it moves away
from
+ // epoch 0 and that it is fully initialized.
+ int groupEpoch = group.groupEpoch();
+ boolean bumpGroupEpoch = false;
+
+ bumpGroupEpoch |= hasStreamsMemberMetadataChanged(groupId, member,
updatedMember, records);
Review Comment:
Why not:
```java
boolean bumpGroupEpoch = hasStreamsMemberMetadataChanged(groupId, member,
updatedMember, records);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]