This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 03e6aa43988 KAFKA-20266: Add tracking of assignment timestamps (#21652)
03e6aa43988 is described below
commit 03e6aa4398869ae20dc58976fa6c4215f9a24bc7
Author: Sean Quah <[email protected]>
AuthorDate: Sat Mar 7 11:10:16 2026 +0000
KAFKA-20266: Add tracking of assignment timestamps (#21652)
Add Timestamp fields to TargetAssignmentMetadata records which default
to 0 when absent. The new timestamp field records when the last target
assignment calculation finished.
When upgrading a classic group, the new timestamp field is initialized
to 0.
Reviewers: khilesh Chaganti <[email protected]>,
Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>
---
.../group/GroupCoordinatorRecordHelpers.java | 26 +--
.../coordinator/group/GroupMetadataManager.java | 15 +-
.../group/TargetAssignmentMetadata.java | 42 +++++
.../coordinator/group/modern/ModernGroup.java | 25 ++-
.../group/modern/TargetAssignmentBuilder.java | 46 +++++-
.../group/modern/consumer/ConsumerGroup.java | 10 +-
.../coordinator/group/modern/share/ShareGroup.java | 2 +-
.../streams/StreamsCoordinatorRecordHelpers.java | 17 +-
.../coordinator/group/streams/StreamsGroup.java | 30 ++--
.../group/streams/TargetAssignmentBuilder.java | 24 ++-
...ConsumerGroupTargetAssignmentMetadataValue.json | 5 +-
.../ShareGroupTargetAssignmentMetadataValue.json | 5 +-
.../StreamsGroupTargetAssignmentMetadataValue.json | 5 +-
.../group/GroupCoordinatorRecordHelpersTest.java | 10 +-
.../group/GroupMetadataManagerTest.java | 184 ++++++++++++++-------
.../group/GroupMetadataManagerTestContext.java | 2 +-
.../group/classic/ClassicGroupTest.java | 4 +-
.../group/modern/TargetAssignmentBuilderTest.java | 81 +++++----
.../modern/consumer/ConsumerGroupBuilder.java | 9 +-
.../group/modern/consumer/ConsumerGroupTest.java | 8 +-
.../group/modern/share/ShareGroupBuilder.java | 9 +-
.../group/modern/share/ShareGroupTest.java | 2 +-
.../StreamsCoordinatorRecordHelpersTest.java | 7 +-
.../group/streams/StreamsGroupBuilder.java | 11 +-
.../group/streams/StreamsGroupTest.java | 14 +-
.../group/streams/TargetAssignmentBuilderTest.java | 79 +++++----
26 files changed, 467 insertions(+), 205 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index f77a092d2ab..0700b11858c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -235,20 +235,23 @@ public class GroupCoordinatorRecordHelpers {
/**
* Creates a ConsumerGroupTargetAssignmentMetadata record.
*
- * @param groupId The consumer group id.
- * @param assignmentEpoch The consumer group epoch.
+ * @param groupId The consumer group id.
+ * @param assignmentEpoch The consumer group epoch.
+ * @param assignmentTimestamp The time at which the target assignment
calculation finished.
* @return The record.
*/
- public static CoordinatorRecord
newConsumerGroupTargetAssignmentEpochRecord(
+ public static CoordinatorRecord
newConsumerGroupTargetAssignmentMetadataRecord(
String groupId,
- int assignmentEpoch
+ int assignmentEpoch,
+ long assignmentTimestamp
) {
return CoordinatorRecord.record(
new ConsumerGroupTargetAssignmentMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ConsumerGroupTargetAssignmentMetadataValue()
- .setAssignmentEpoch(assignmentEpoch),
+ .setAssignmentEpoch(assignmentEpoch)
+ .setAssignmentTimestamp(assignmentTimestamp),
(short) 0
)
);
@@ -663,20 +666,23 @@ public class GroupCoordinatorRecordHelpers {
/**
* Creates a ShareGroupTargetAssignmentMetadata record.
*
- * @param groupId The group id.
- * @param assignmentEpoch The group epoch.
+ * @param groupId The group id.
+ * @param assignmentEpoch The group epoch.
+ * @param assignmentTimestamp The time at which the target assignment
calculation finished.
* @return The record.
*/
- public static CoordinatorRecord newShareGroupTargetAssignmentEpochRecord(
+ public static CoordinatorRecord
newShareGroupTargetAssignmentMetadataRecord(
String groupId,
- int assignmentEpoch
+ int assignmentEpoch,
+ long assignmentTimestamp
) {
return CoordinatorRecord.record(
new ShareGroupTargetAssignmentMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ShareGroupTargetAssignmentMetadataValue()
- .setAssignmentEpoch(assignmentEpoch),
+ .setAssignmentEpoch(assignmentEpoch)
+ .setAssignmentTimestamp(assignmentTimestamp),
(short) 0
)
);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index fa45d1c6e8a..2033e24286a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3821,6 +3821,7 @@ public class GroupMetadataManager {
try {
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder
assignmentResultBuilder =
new
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(),
groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
+ .withTime(time)
.withMembers(group.members())
.withStaticMembers(group.staticMembers())
.withSubscriptionType(subscriptionType)
@@ -3890,6 +3891,7 @@ public class GroupMetadataManager {
TargetAssignmentBuilder.ShareTargetAssignmentBuilder
assignmentResultBuilder =
new
TargetAssignmentBuilder.ShareTargetAssignmentBuilder(group.groupId(),
groupEpoch, shareGroupAssignor)
+ .withTime(time)
.withMembers(group.members())
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
@@ -3955,6 +3957,7 @@ public class GroupMetadataManager {
assignor,
assignmentConfigs
)
+ .withTime(time)
.withMembers(group.members())
.withTopology(configuredTopology)
.withStaticMembers(group.staticMembers())
@@ -5337,7 +5340,7 @@ public class GroupMetadataManager {
if (value != null) {
ConsumerGroup group =
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
- group.setTargetAssignmentEpoch(value.assignmentEpoch());
+ group.setTargetAssignmentMetadata(value.assignmentEpoch(),
value.assignmentTimestamp());
} else {
ConsumerGroup group;
try {
@@ -5350,7 +5353,7 @@ public class GroupMetadataManager {
throw new IllegalStateException("Received a tombstone record
to delete target assignment of " + groupId
+ " but the assignment still has " +
group.targetAssignment().size() + " members.");
}
- group.setTargetAssignmentEpoch(-1);
+ group.setTargetAssignmentMetadata(-1, 0L);
}
}
@@ -5653,7 +5656,7 @@ public class GroupMetadataManager {
if (value != null) {
StreamsGroup streamsGroup =
getOrMaybeCreatePersistedStreamsGroup(groupId, true);
- streamsGroup.setTargetAssignmentEpoch(value.assignmentEpoch());
+ streamsGroup.setTargetAssignmentMetadata(value.assignmentEpoch(),
value.assignmentTimestamp());
} else {
StreamsGroup streamsGroup;
try {
@@ -5666,7 +5669,7 @@ public class GroupMetadataManager {
throw new IllegalStateException("Received a tombstone record
to delete target assignment of " + groupId
+ " but the assignment still has " +
streamsGroup.targetAssignment().size() + " members.");
}
- streamsGroup.setTargetAssignmentEpoch(-1);
+ streamsGroup.setTargetAssignmentMetadata(-1, 0L);
}
}
@@ -5801,13 +5804,13 @@ public class GroupMetadataManager {
}
if (value != null) {
- group.setTargetAssignmentEpoch(value.assignmentEpoch());
+ group.setTargetAssignmentMetadata(value.assignmentEpoch(),
value.assignmentTimestamp());
} else {
if (!group.targetAssignment().isEmpty()) {
throw new IllegalStateException("Received a tombstone record
to delete target assignment of " + groupId
+ " but the assignment still has " +
group.targetAssignment().size() + " members.");
}
- group.setTargetAssignmentEpoch(-1);
+ group.setTargetAssignmentMetadata(-1, 0L);
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
new file mode 100644
index 00000000000..ce9d6b30843
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * The target assignment metadata.
+ *
+ * @param assignmentEpoch The target assignment epoch. An assignment epoch
smaller than the
+ * group epoch means that a new assignment is
required. The
+ * assignment epoch is updated when a new
assignment is installed.
+ * @param assignmentTimestamp The time at which the target assignment
calculation finished.
+ */
+public record TargetAssignmentMetadata(int assignmentEpoch, long
assignmentTimestamp) {
+ /**
+ * The initial target assignment metadata for groups.
+ * This is different to tombstoned assignment metadata which has an
assignment epoch of -1.
+ */
+ public static final TargetAssignmentMetadata ZERO = new
TargetAssignmentMetadata(0, 0L);
+
+ public TargetAssignmentMetadata {
+ if (assignmentEpoch < 0 && assignmentEpoch != -1) {
+ throw new IllegalArgumentException("The assignment epoch must be
non-negative or -1.");
+ }
+ if (assignmentTimestamp < 0) {
+ throw new IllegalArgumentException("The assignment timestamp must
be non-negative.");
+ }
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
index 1fa7ef24e95..6e882b29d09 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
@@ -21,6 +21,7 @@ import
org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -96,11 +97,9 @@ public abstract class ModernGroup<T extends
ModernGroupMember> implements Group
protected final TimelineObject<SubscriptionType> subscriptionType;
/**
- * The target assignment epoch. An assignment epoch smaller than the group
epoch
- * means that a new assignment is required. The assignment epoch is
updated when
- * a new assignment is installed.
+ * The target assignment metadata.
*/
- protected final TimelineInteger targetAssignmentEpoch;
+ protected final TimelineObject<TargetAssignmentMetadata>
targetAssignmentMetadata;
/**
* The target assignment per member id.
@@ -136,7 +135,7 @@ public abstract class ModernGroup<T extends
ModernGroupMember> implements Group
this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
this.metadataHash = new TimelineLong(snapshotRegistry);
this.subscriptionType = new TimelineObject<>(snapshotRegistry,
HOMOGENEOUS);
- this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+ this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry,
TargetAssignmentMetadata.ZERO);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.invertedTargetAssignment = new
TimelineHashMap<>(snapshotRegistry, 0);
}
@@ -181,16 +180,24 @@ public abstract class ModernGroup<T extends
ModernGroupMember> implements Group
* @return The target assignment epoch.
*/
public int assignmentEpoch() {
- return targetAssignmentEpoch.get();
+ return targetAssignmentMetadata.get().assignmentEpoch();
}
/**
- * Sets the assignment epoch.
+ * @return The time at which the target assignment calculation finished.
+ */
+ public long assignmentTimestamp() {
+ return targetAssignmentMetadata.get().assignmentTimestamp();
+ }
+
+ /**
+ * Sets the assignment metadata.
*
* @param targetAssignmentEpoch The new assignment epoch.
+ * @param targetAssignmentTimestamp The time at which the assignment
calculation finished.
*/
- public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
- this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+ public void setTargetAssignmentMetadata(int targetAssignmentEpoch, long
targetAssignmentTimestamp) {
+ this.targetAssignmentMetadata.set(new
TargetAssignmentMetadata(targetAssignmentEpoch, targetAssignmentTimestamp));
maybeUpdateGroupState();
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
index d08c0342ce2..86c64e5c22d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.modern;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
@@ -139,10 +140,15 @@ public abstract class TargetAssignmentBuilder<T extends
ModernGroupMember, U ext
}
@Override
- protected CoordinatorRecord newTargetAssignmentEpochRecord(String
groupId, int assignmentEpoch) {
- return
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(
+ protected CoordinatorRecord newTargetAssignmentMetadataRecord(
+ String groupId,
+ int assignmentEpoch,
+ long assignmentTimestamp
+ ) {
+ return
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(
groupId,
- assignmentEpoch
+ assignmentEpoch,
+ assignmentTimestamp
);
}
@@ -208,10 +214,15 @@ public abstract class TargetAssignmentBuilder<T extends
ModernGroupMember, U ext
}
@Override
- protected CoordinatorRecord newTargetAssignmentEpochRecord(String
groupId, int assignmentEpoch) {
- return
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(
+ protected CoordinatorRecord newTargetAssignmentMetadataRecord(
+ String groupId,
+ int assignmentEpoch,
+ long assignmentTimestamp
+ ) {
+ return
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(
groupId,
- assignmentEpoch
+ assignmentEpoch,
+ assignmentTimestamp
);
}
@@ -230,6 +241,11 @@ public abstract class TargetAssignmentBuilder<T extends
ModernGroupMember, U ext
}
}
+ /**
+ * The time.
+ */
+ private Time time;
+
/**
* The group id.
*/
@@ -304,6 +320,17 @@ public abstract class TargetAssignmentBuilder<T extends
ModernGroupMember, U ext
this.assignor = Objects.requireNonNull(assignor);
}
+ /**
+ * Sets the time.
+ *
+ * @param time The time.
+ * @return This object.
+ */
+ public U withTime(Time time) {
+ this.time = time;
+ return self();
+ }
+
/**
* Adds all the existing members.
*
@@ -491,7 +518,7 @@ public abstract class TargetAssignmentBuilder<T extends
ModernGroupMember, U ext
}
// Bump the target assignment epoch.
- records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch));
+ records.add(newTargetAssignmentMetadataRecord(groupId, groupEpoch,
time.milliseconds()));
return new TargetAssignmentResult(records,
newGroupAssignment.members());
}
@@ -504,9 +531,10 @@ public abstract class TargetAssignmentBuilder<T extends
ModernGroupMember, U ext
Map<Uuid, Set<Integer>> partitions
);
- protected abstract CoordinatorRecord newTargetAssignmentEpochRecord(
+ protected abstract CoordinatorRecord newTargetAssignmentMetadataRecord(
String groupId,
- int assignmentEpoch
+ int assignmentEpoch,
+ long timestampMs
);
protected abstract MemberSubscriptionAndAssignmentImpl
newMemberSubscriptionAndAssignment(
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 8aa858ced02..9e6de2ea24d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -896,11 +896,11 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
ConsumerGroupState newState = STABLE;
if (members.isEmpty()) {
newState = EMPTY;
- } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
+ } else if (groupEpoch.get() > assignmentEpoch()) {
newState = ASSIGNING;
} else {
for (ModernGroupMember member : members.values()) {
- if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
+ if (!member.isReconciledTo(assignmentEpoch())) {
newState = RECONCILING;
break;
}
@@ -1121,7 +1121,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
.setAssignorName(preferredServerAssignor(committedOffset).orElse(defaultAssignor))
.setGroupEpoch(groupEpoch.get(committedOffset))
.setGroupState(state.get(committedOffset).toString())
- .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset));
+
.setAssignmentEpoch(targetAssignmentMetadata.get(committedOffset).assignmentEpoch());
members.entrySet(committedOffset).forEach(
entry -> describedGroup.members().add(
entry.getValue().asConsumerGroupDescribeMember(
@@ -1156,7 +1156,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
String groupId = classicGroup.groupId();
ConsumerGroup consumerGroup = new ConsumerGroup(logContext,
snapshotRegistry, groupId);
consumerGroup.setGroupEpoch(classicGroup.generationId());
- consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+ consumerGroup.setTargetAssignmentMetadata(classicGroup.generationId(),
0L);
classicGroup.allMembers().forEach(classicGroupMember -> {
// The assigned partition can be empty if the member just joined
and has never synced.
@@ -1238,7 +1238,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
))
);
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId(),
groupEpoch()));
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId(),
assignmentEpoch(), assignmentTimestamp()));
members().forEach((__, consumerGroupMember) ->
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId(),
consumerGroupMember))
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index 57c9cf5cb65..4d8be291743 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -313,7 +313,7 @@ public class ShareGroup extends
ModernGroup<ShareGroupMember> {
.setAssignorName(defaultAssignor)
.setGroupEpoch(groupEpoch.get(committedOffset))
.setGroupState(state.get(committedOffset).toString())
- .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset));
+
.setAssignmentEpoch(targetAssignmentMetadata.get(committedOffset).assignmentEpoch());
members.entrySet(committedOffset).forEach(
entry -> describedGroup.members().add(
entry.getValue().asShareGroupDescribeMember(
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index 516dc44b0d2..1cac2849d8e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -215,10 +215,18 @@ public class StreamsCoordinatorRecordHelpers {
);
}
-
- public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
+ /**
+ * Creates a StreamsGroupTargetAssignmentMetadata record.
+ *
+ * @param groupId The streams group id.
+ * @param assignmentEpoch The assignment epoch.
+ * @param assignmentTimestamp The time at which the target assignment
calculation finished.
+ * @return The record.
+ */
+ public static CoordinatorRecord
newStreamsGroupTargetAssignmentMetadataRecord(
String groupId,
- int assignmentEpoch
+ int assignmentEpoch,
+ long assignmentTimestamp
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
@@ -227,7 +235,8 @@ public class StreamsCoordinatorRecordHelpers {
.setGroupId(groupId),
new ApiMessageAndVersion(
new StreamsGroupTargetAssignmentMetadataValue()
- .setAssignmentEpoch(assignmentEpoch),
+ .setAssignmentEpoch(assignmentEpoch)
+ .setAssignmentTimestamp(assignmentTimestamp),
(short) 0
)
);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index b59081f7ea3..2e62f306749 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -33,6 +33,7 @@ import
org.apache.kafka.coordinator.group.CommitPartitionValidator;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
@@ -161,10 +162,9 @@ public class StreamsGroup implements Group {
protected final TimelineLong metadataHash;
/**
- * The target assignment epoch. An assignment epoch smaller than the group
epoch means that a new assignment is required. The assignment
- * epoch is updated when a new assignment is installed.
+ * The target assignment metadata.
*/
- private final TimelineInteger targetAssignmentEpoch;
+ private final TimelineObject<TargetAssignmentMetadata>
targetAssignmentMetadata;
/**
* The target assignment per member ID.
@@ -241,7 +241,7 @@ public class StreamsGroup implements Group {
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
this.metadataHash = new TimelineLong(snapshotRegistry);
- this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+ this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry,
TargetAssignmentMetadata.ZERO);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentActiveTaskToProcessId = new
TimelineHashMap<>(snapshotRegistry, 0);
this.currentStandbyTaskToProcessIds = new
TimelineHashMap<>(snapshotRegistry, 0);
@@ -340,16 +340,24 @@ public class StreamsGroup implements Group {
* @return The target assignment epoch.
*/
public int assignmentEpoch() {
- return targetAssignmentEpoch.get();
+ return targetAssignmentMetadata.get().assignmentEpoch();
}
/**
- * Sets the assignment epoch.
+ * @return The time at which the target assignment calculation finished.
+ */
+ public long assignmentTimestamp() {
+ return targetAssignmentMetadata.get().assignmentTimestamp();
+ }
+
+ /**
+ * Sets the assignment metadata.
*
* @param targetAssignmentEpoch The new assignment epoch.
+ * @param targetAssignmentTimestamp The time at which the assignment
calculation finished.
*/
- public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
- this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+ public void setTargetAssignmentMetadata(int targetAssignmentEpoch, long
targetAssignmentTimestamp) {
+ this.targetAssignmentMetadata.set(new
TargetAssignmentMetadata(targetAssignmentEpoch, targetAssignmentTimestamp));
maybeUpdateGroupState();
}
@@ -896,11 +904,11 @@ public class StreamsGroup implements Group {
clearShutdownRequestMemberId();
} else if (topology().filter(t -> t.topologyEpoch() ==
validatedTopologyEpoch.get()).isEmpty()) {
newState = NOT_READY;
- } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
+ } else if (groupEpoch.get() > assignmentEpoch()) {
newState = ASSIGNING;
} else {
for (StreamsGroupMember member : members.values()) {
- if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
+ if (!member.isReconciledTo(assignmentEpoch())) {
newState = RECONCILING;
break;
}
@@ -1094,7 +1102,7 @@ public class StreamsGroup implements Group {
.setGroupId(groupId)
.setGroupEpoch(groupEpoch.get(committedOffset))
.setGroupState(state.get(committedOffset).toString())
- .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
+
.setAssignmentEpoch(targetAssignmentMetadata.get(committedOffset).assignmentEpoch())
.setTopology(
configuredTopology.get(committedOffset)
.filter(ConfiguredTopology::isReady)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
index 809489907ba..f00f629dd08 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group.streams;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import
org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
@@ -46,6 +47,11 @@ import java.util.stream.Collectors;
*/
public class TargetAssignmentBuilder {
+ /**
+ * The time.
+ */
+ private Time time;
+
/**
* The group ID.
*/
@@ -131,6 +137,17 @@ public class TargetAssignmentBuilder {
);
}
+ /**
+ * Sets the time.
+ *
+ * @param time The time.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withTime(Time time) {
+ this.time = time;
+ return this;
+ }
+
/**
* Adds all the existing members.
*
@@ -196,7 +213,6 @@ public class TargetAssignmentBuilder {
return this;
}
-
/**
* Adds or updates a member. This is useful when the updated member is not
yet materialized in memory.
*
@@ -313,7 +329,11 @@ public class TargetAssignmentBuilder {
});
// Bump the target assignment epoch.
-
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
groupEpoch));
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(
+ groupId,
+ groupEpoch,
+ time.milliseconds()
+ ));
return new TargetAssignmentResult(records, newTargetAssignment);
}
diff --git
a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json
b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json
index 939794e1e5c..d59d809112b 100644
---
a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json
@@ -21,6 +21,9 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
- "about": "The assignment epoch." }
+ "about": "The assignment epoch." },
+ // Added in 4.3 (KIP-1263).
+ { "name": "AssignmentTimestamp", "versions": "0+", "taggedVersions": "0+",
"tag": 0, "type": "int64", "default": 0, "ignorable": true,
+ "about": "The time at which the assignment calculation finished." }
]
}
diff --git
a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json
b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json
index 284c28faf5c..112df37fa75 100644
---
a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json
@@ -21,6 +21,9 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
- "about": "The assignment epoch." }
+ "about": "The assignment epoch." },
+ // Added in 4.3 (KIP-1263).
+ { "name": "AssignmentTimestamp", "versions": "0+", "taggedVersions": "0+",
"tag": 0, "type": "int64", "default": 0, "ignorable": true,
+ "about": "The time at which the assignment calculation finished." }
]
}
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
index 86143bd5a83..491b70b3007 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
@@ -21,6 +21,9 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
- "about": "The assignment epoch." }
+ "about": "The assignment epoch." },
+ // Added in 4.3 (KIP-1263).
+ { "name": "AssignmentTimestamp", "versions": "0+", "taggedVersions": "0+",
"tag": 0, "type": "int64", "default": 0, "ignorable": true,
+ "about": "The time at which the assignment calculation finished." }
]
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index 38ee8501ee2..2f52046a7a5 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -79,8 +79,8 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochRecord;
@@ -312,14 +312,16 @@ public class GroupCoordinatorRecordHelpersTest {
.setGroupId("group-id"),
new ApiMessageAndVersion(
new ConsumerGroupTargetAssignmentMetadataValue()
- .setAssignmentEpoch(10),
+ .setAssignmentEpoch(10)
+ .setAssignmentTimestamp(12345L),
(short) 0
)
);
- assertEquals(expectedRecord,
newConsumerGroupTargetAssignmentEpochRecord(
+ assertEquals(expectedRecord,
newConsumerGroupTargetAssignmentMetadataRecord(
"group-id",
- 10
+ 10,
+ 12345L
));
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 91147aed051..a4e7b168c9a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -429,7 +429,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 1, 2, 3)
)));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
100, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
@@ -519,7 +519,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
100, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
@@ -593,7 +593,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)
)));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
101));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
101, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
@@ -663,7 +663,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
100, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
@@ -734,7 +734,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
100, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
@@ -821,7 +821,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)
)));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
101));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
101, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
@@ -926,7 +926,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 0, 1, 2)
)));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment()));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
100, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member1));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member2));
@@ -1016,7 +1016,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)
)));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
101));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
101, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
@@ -1101,7 +1101,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)));
-
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
100, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId,
member));
// Member rejoins with epoch=0 - should succeed.
@@ -1170,7 +1170,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)
)));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
100, 12345L));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
member));
// Member rejoins with epoch=0 - should succeed per KIP-848.
@@ -1286,7 +1286,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
1, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -1365,7 +1365,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
1, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -1492,7 +1492,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -1584,7 +1584,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, newMetadataImage)
))),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -1674,7 +1674,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, Map.of()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -1764,7 +1764,7 @@ public class GroupMetadataManagerTest {
fooTopicName, computeTopicHash(fooTopicName, new
KRaftCoordinatorMetadataImage(metadataImage))
))),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -1899,7 +1899,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(barTopicId, 2)
))
),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds())),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3))
),
result.records()
@@ -2139,7 +2139,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(barTopicId, 2)
))
),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds())),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3))
),
result.records()
@@ -2504,7 +2504,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedRejoinedMember)
);
@@ -2909,7 +2909,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 1, 2, 3)
)));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
100, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
@@ -3726,7 +3726,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3))));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, 12345L));
assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING,
context.consumerGroupState(groupId));
@@ -3885,7 +3885,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -4013,7 +4013,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -11422,7 +11422,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(classicGroupId,
expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(classicGroupId,
memberId, Map.of()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(classicGroupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(classicGroupId,
1, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(classicGroupId,
expectedMember)
),
result.records()
@@ -11589,7 +11589,7 @@ public class GroupMetadataManagerTest {
barTopicName, computeTopicHash(barTopicName, metadataImage)
))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, expectedMember1.assignedPartitions()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1),
// Member 2 joins the new consumer group.
@@ -11602,7 +11602,7 @@ public class GroupMetadataManagerTest {
))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, assignor.targetPartitions(memberId2)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, assignor.targetPartitions(memberId1)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
1, context.time.milliseconds()),
// Member 2 has no pending revoking partition. Bump its member
epoch and transition to UNRELEASED_PARTITIONS.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
@@ -11806,7 +11806,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, expectedMember2.assignedPartitions()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2),
@@ -11821,7 +11821,7 @@ public class GroupMetadataManagerTest {
))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, assignor.targetPartitions(memberId1)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId3, assignor.targetPartitions(memberId3)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
1, context.time.milliseconds()),
// Member 3 has no pending revoking partition. Bump its member
epoch and transition to UNRELEASED_PARTITIONS.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3)
@@ -12079,7 +12079,7 @@ public class GroupMetadataManagerTest {
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, expectedClassicMember.assignedPartitions()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
group.generationId()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
group.generationId(), 0),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedClassicMember),
// Remove the static member because the rejoining member replaces
it.
@@ -12205,7 +12205,7 @@ public class GroupMetadataManagerTest {
barTopicName, computeTopicHash(barTopicName, metadataImage)
))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, expectedMember1.assignedPartitions()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1),
// Member 2 joins the new consumer group.
@@ -12217,7 +12217,7 @@ public class GroupMetadataManagerTest {
barTopicName, computeTopicHash(barTopicName, metadataImage)
))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, Map.of()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
// Member 2 has no pending revoking partition or pending release
partition.
// Bump its member epoch and transition to STABLE.
@@ -12591,7 +12591,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, expectedMember2.assignedPartitions()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2),
@@ -12606,7 +12606,7 @@ public class GroupMetadataManagerTest {
))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, assignor.targetPartitions(memberId1)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId3, assignor.targetPartitions(memberId3)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
// Member 3 has no pending revoking partition. Bump its member
epoch and transition to UNRELEASED_PARTITIONS.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3)
@@ -13723,7 +13723,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, assignor.targetPartitions(memberId)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId, assignor.targetPartitions(newMemberId))
),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds())),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember))
),
@@ -13883,7 +13883,7 @@ public class GroupMetadataManagerTest {
))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId, Map.of()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -14150,7 +14150,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(barTopicId, 0),
mkTopicAssignment(fooTopicId, 1)))
),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds())),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember))
),
joinResult.records
@@ -14385,7 +14385,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
mkTopicAssignment(barTopicId, 0)))
),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds())),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1))
),
@@ -14624,7 +14624,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
mkTopicAssignment(barTopicId, 0)))
),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds())),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1))
),
@@ -16467,7 +16467,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
-
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
1, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId,
expectedMember),
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId,
mkShareGroupStateMap(List.of(
mkShareGroupStateMetadataEntry(fooTopicId, fooTopicName,
List.of(0, 1, 2, 3, 4, 5)),
@@ -16704,7 +16704,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3))));
-
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId,
11));
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
11, 12345L));
assertEquals(ShareGroup.ShareGroupState.STABLE,
context.shareGroupState(groupId));
@@ -16826,7 +16826,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3)
)));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
100, 12345L));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
member));
@@ -17074,7 +17074,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
)),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
2),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -17247,7 +17247,7 @@ public class GroupMetadataManagerTest {
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
)), -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
2),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -17333,7 +17333,7 @@ public class GroupMetadataManagerTest {
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
)), -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
2),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -17417,7 +17417,7 @@ public class GroupMetadataManagerTest {
barTopicName, computeTopicHash(barTopicName, metadataImage)
)), -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
2),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -17514,7 +17514,7 @@ public class GroupMetadataManagerTest {
barTopicName, computeTopicHash(barTopicName, metadataImage)
)), 1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -17817,7 +17817,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
)),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -17940,7 +17940,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
)),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -18913,7 +18913,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId1,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3))));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
11, 12345L));
assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
context.streamsGroupState(groupId));
@@ -19083,7 +19083,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)
)),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -19205,7 +19205,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)
)),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -20127,7 +20127,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId,
topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId,
2, 0, -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
memberId, TasksTuple.EMPTY),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId,
2),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(classicGroupId,
2, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
expectedMember)
),
result.records()
@@ -20631,8 +20631,9 @@ public class GroupMetadataManagerTest {
.build();
// The group is created if it does not exist.
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord("foo",
10));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord("foo",
10, 12345L));
assertEquals(10,
context.groupMetadataManager.consumerGroup("foo").assignmentEpoch());
+ assertEquals(12345L,
context.groupMetadataManager.consumerGroup("foo").assignmentTimestamp());
}
@Test
@@ -20646,6 +20647,22 @@ public class GroupMetadataManagerTest {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.consumerGroup("foo"));
}
+ @Test
+ public void
testReplayConsumerGroupTargetAssignmentMetadataTombstoneExisting() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // Create the group by replaying a value record.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord("foo",
10, 12345L));
+ assertEquals(10,
context.groupMetadataManager.consumerGroup("foo").assignmentEpoch());
+ assertEquals(12345L,
context.groupMetadataManager.consumerGroup("foo").assignmentTimestamp());
+
+ // Replay the tombstone. It should reset both the epoch and the
timestamp.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("foo"));
+ assertEquals(-1,
context.groupMetadataManager.consumerGroup("foo").assignmentEpoch());
+ assertEquals(0L,
context.groupMetadataManager.consumerGroup("foo").assignmentTimestamp());
+ }
+
@Test
public void testReplayConsumerGroupCurrentMemberAssignment() {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
@@ -20681,6 +20698,44 @@ public class GroupMetadataManagerTest {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.consumerGroup("bar"));
}
+ @Test
+ public void testReplayShareGroupTargetAssignmentMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord("foo",
10, 12345L));
+ assertEquals(10,
context.groupMetadataManager.shareGroup("foo").assignmentEpoch());
+ assertEquals(12345L,
context.groupMetadataManager.shareGroup("foo").assignmentTimestamp());
+ }
+
+ @Test
+ public void testReplayShareGroupTargetAssignmentMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
ShareGroupTargetAssignmentMetadata tombstone
+ // should be a no-op.
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.shareGroup("foo"));
+ }
+
+ @Test
+ public void
testReplayShareGroupTargetAssignmentMetadataTombstoneExisting() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // Create the group by replaying a value record.
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord("foo",
10, 12345L));
+ assertEquals(10,
context.groupMetadataManager.shareGroup("foo").assignmentEpoch());
+ assertEquals(12345L,
context.groupMetadataManager.shareGroup("foo").assignmentTimestamp());
+
+ // Replay the tombstone. It should reset both the epoch and the
timestamp.
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochTombstoneRecord("foo"));
+ assertEquals(-1,
context.groupMetadataManager.shareGroup("foo").assignmentEpoch());
+ assertEquals(0L,
context.groupMetadataManager.shareGroup("foo").assignmentTimestamp());
+ }
+
@Test
public void testReplayStreamsGroupMemberMetadata() {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
@@ -20907,8 +20962,9 @@ public class GroupMetadataManagerTest {
.build();
// The group is created if it does not exist.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo",
10));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord("foo",
10, 12345L));
assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+ assertEquals(12345L,
context.groupMetadataManager.streamsGroup("foo").assignmentTimestamp());
}
@Test
@@ -20938,6 +20994,7 @@ public class GroupMetadataManagerTest {
.withStreamsGroup(
new StreamsGroupBuilder("foo", 10)
.withTargetAssignmentEpoch(10)
+ .withTargetAssignmentTimestamp(12345L)
.withTargetAssignment("m1", tasks)
)
.build();
@@ -20954,6 +21011,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
assertEquals(-1,
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+ assertEquals(0L,
context.groupMetadataManager.streamsGroup("foo").assignmentTimestamp());
}
@Test
@@ -22083,7 +22141,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
);
@@ -22151,7 +22209,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1, 0),
// The target assignment is created.
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, Map.of()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
1, context.time.milliseconds()),
// The member current state is created.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
);
@@ -22473,7 +22531,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*"),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, Map.of()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
);
@@ -22622,7 +22680,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
expectedMember1.memberId(), Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, Map.of())
),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds())),
// The member assignment is updated.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1))
);
@@ -22766,7 +22824,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
expectedMember1.memberId(), Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, Map.of())
),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds())),
// The member assignment is updated.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1))
);
@@ -22971,7 +23029,7 @@ public class GroupMetadataManagerTest {
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*|bar*"),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
),
result2.records()
@@ -23196,7 +23254,7 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(
List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
),
result2.records()
@@ -23430,7 +23488,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
);
@@ -24895,7 +24953,7 @@ public class GroupMetadataManagerTest {
new InitMapValue(
topicName,
Set.of(0, 1),
- 0
+ context.time.milliseconds()
)
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index cbaa5770541..3edeb452d05 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -457,7 +457,7 @@ public class GroupMetadataManagerTestContext {
}
public static class Builder {
- private MockTime time = new MockTime(0, 0, 0);
+ private MockTime time = new MockTime(0, 1000, 1000);
private final MockCoordinatorTimer<CoordinatorRecord> timer = new
MockCoordinatorTimer<>(time);
private final MockCoordinatorExecutor<CoordinatorRecord> executor =
new MockCoordinatorExecutor<>();
private final LogContext logContext = new LogContext();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 445ba969648..3b1ae79bed6 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1385,7 +1385,7 @@ public class ClassicGroupTest {
groupId
);
consumerGroup.setGroupEpoch(10);
- consumerGroup.setTargetAssignmentEpoch(10);
+ consumerGroup.setTargetAssignmentMetadata(10, time.milliseconds());
consumerGroup.updateTargetAssignment(memberId1, new
Assignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
@@ -1538,7 +1538,7 @@ public class ClassicGroupTest {
groupId
);
consumerGroup.setGroupEpoch(10);
- consumerGroup.setTargetAssignmentEpoch(10);
+ consumerGroup.setTargetAssignmentMetadata(10, time.milliseconds());
consumerGroup.updateTargetAssignment(memberId1, new
Assignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
index 7c9de06c014..902864652dc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.modern;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
@@ -41,7 +42,7 @@ import static
org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals;
import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -56,6 +57,7 @@ public class TargetAssignmentBuilderTest {
public static class TargetAssignmentBuilderTestContext {
private final String groupId;
private final int groupEpoch;
+ private final long assignmentTimestamp;
private final PartitionAssignor assignor =
mock(PartitionAssignor.class);
private final Map<String, ConsumerGroupMember> members = new
HashMap<>();
private final Map<String, ConsumerGroupMember> updatedMembers = new
HashMap<>();
@@ -67,10 +69,12 @@ public class TargetAssignmentBuilderTest {
public TargetAssignmentBuilderTestContext(
String groupId,
- int groupEpoch
+ int groupEpoch,
+ long assignmentTimestamp
) {
this.groupId = groupId;
this.groupEpoch = groupEpoch;
+ this.assignmentTimestamp = assignmentTimestamp;
}
public void addGroupMember(
@@ -274,6 +278,7 @@ public class TargetAssignmentBuilderTest {
// Create and populate the assignment builder.
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder builder =
new
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(groupId, groupEpoch,
assignor)
+ .withTime(new MockTime(0, assignmentTimestamp,
assignmentTimestamp))
.withMembers(members)
.withStaticMembers(staticMembers)
.withSubscriptionType(subscriptionType)
@@ -307,13 +312,15 @@ public class TargetAssignmentBuilderTest {
public void testEmpty() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
- assertEquals(List.of(newConsumerGroupTargetAssignmentEpochRecord(
+ assertEquals(List.of(newConsumerGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)), result.records());
assertEquals(Map.of(), result.targetAssignment());
}
@@ -322,7 +329,8 @@ public class TargetAssignmentBuilderTest {
public void testAssignmentHasNotChanged() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -350,9 +358,10 @@ public class TargetAssignmentBuilderTest {
TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
- assertEquals(List.of(newConsumerGroupTargetAssignmentEpochRecord(
+ assertEquals(List.of(newConsumerGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)), result.records());
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
@@ -372,7 +381,8 @@ public class TargetAssignmentBuilderTest {
public void testAssignmentSwapped() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -413,9 +423,10 @@ public class TargetAssignmentBuilderTest {
))
),
List.of(
- newConsumerGroupTargetAssignmentEpochRecord(
+ newConsumerGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)
)
),
@@ -439,7 +450,8 @@ public class TargetAssignmentBuilderTest {
public void testNewMember() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -491,9 +503,10 @@ public class TargetAssignmentBuilderTest {
))
),
List.of(
- newConsumerGroupTargetAssignmentEpochRecord(
+ newConsumerGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)
)
),
@@ -521,7 +534,8 @@ public class TargetAssignmentBuilderTest {
public void testUpdateMember() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -582,9 +596,10 @@ public class TargetAssignmentBuilderTest {
))
),
List.of(
- newConsumerGroupTargetAssignmentEpochRecord(
+ newConsumerGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)
)
),
@@ -612,7 +627,8 @@ public class TargetAssignmentBuilderTest {
public void testPartialAssignmentUpdate() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -664,9 +680,10 @@ public class TargetAssignmentBuilderTest {
))
),
List.of(
- newConsumerGroupTargetAssignmentEpochRecord(
+ newConsumerGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)
)
),
@@ -694,7 +711,8 @@ public class TargetAssignmentBuilderTest {
public void testDeleteMember() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -742,9 +760,10 @@ public class TargetAssignmentBuilderTest {
))
),
List.of(
- newConsumerGroupTargetAssignmentEpochRecord(
+ newConsumerGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)
)
),
@@ -768,7 +787,8 @@ public class TargetAssignmentBuilderTest {
public void testReplaceStaticMember() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -818,9 +838,10 @@ public class TargetAssignmentBuilderTest {
mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6)
)),
- newConsumerGroupTargetAssignmentEpochRecord(
+ newConsumerGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)
),
result.records()
@@ -848,7 +869,8 @@ public class TargetAssignmentBuilderTest {
public void testRegularExpressions() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -898,9 +920,10 @@ public class TargetAssignmentBuilderTest {
))
),
List.of(
- newConsumerGroupTargetAssignmentEpochRecord(
+ newConsumerGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)
)
),
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
index e086f283aac..40d3300b093 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
@@ -31,6 +31,7 @@ public class ConsumerGroupBuilder {
private final String groupId;
private final int groupEpoch;
private int assignmentEpoch;
+ private long assignmentTimestamp;
private final Map<String, ConsumerGroupMember> members = new HashMap<>();
private final Map<String, Assignment> assignments = new HashMap<>();
private long metadataHash = 0L;
@@ -40,6 +41,7 @@ public class ConsumerGroupBuilder {
this.groupId = groupId;
this.groupEpoch = groupEpoch;
this.assignmentEpoch = 0;
+ this.assignmentTimestamp = 0L;
}
public ConsumerGroupBuilder withMember(ConsumerGroupMember member) {
@@ -70,6 +72,11 @@ public class ConsumerGroupBuilder {
return this;
}
+ public ConsumerGroupBuilder withAssignmentTimestamp(long
assignmentTimestamp) {
+ this.assignmentTimestamp = assignmentTimestamp;
+ return this;
+ }
+
public List<CoordinatorRecord> build() {
List<CoordinatorRecord> records = new ArrayList<>();
@@ -92,7 +99,7 @@ public class ConsumerGroupBuilder {
);
// Add target assignment epoch.
-
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
assignmentEpoch));
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
assignmentEpoch, assignmentTimestamp));
// Add current assignment records for members.
members.forEach((memberId, member) ->
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index c8ada8fc5f4..a45eaa62991 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -475,7 +475,7 @@ public class ConsumerGroupTest {
assertEquals(MemberState.STABLE, member2.state());
assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING,
consumerGroup.state());
- consumerGroup.setTargetAssignmentEpoch(2);
+ consumerGroup.setTargetAssignmentMetadata(2, 12345L);
assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING,
consumerGroup.state());
@@ -1000,7 +1000,7 @@ public class ConsumerGroupTest {
assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING,
consumerGroup.state());
assertThrows(GroupNotEmptyException.class,
consumerGroup::validateDeleteGroup);
- consumerGroup.setTargetAssignmentEpoch(1);
+ consumerGroup.setTargetAssignmentMetadata(1, 12345L);
assertEquals(ConsumerGroup.ConsumerGroupState.STABLE,
consumerGroup.state());
assertThrows(GroupNotEmptyException.class,
consumerGroup::validateDeleteGroup);
@@ -1329,7 +1329,7 @@ public class ConsumerGroupTest {
groupId
);
expectedConsumerGroup.setGroupEpoch(10);
- expectedConsumerGroup.setTargetAssignmentEpoch(10);
+ expectedConsumerGroup.setTargetAssignmentMetadata(10, 0L);
expectedConsumerGroup.updateTargetAssignment(memberId, new
Assignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
@@ -1366,6 +1366,8 @@ public class ConsumerGroupTest {
assertEquals(expectedConsumerGroup.groupId(), consumerGroup.groupId());
assertEquals(expectedConsumerGroup.groupEpoch(),
consumerGroup.groupEpoch());
+ assertEquals(expectedConsumerGroup.assignmentEpoch(),
consumerGroup.assignmentEpoch());
+ assertEquals(expectedConsumerGroup.assignmentTimestamp(),
consumerGroup.assignmentTimestamp());
assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
assertEquals(expectedConsumerGroup.preferredServerAssignor(),
consumerGroup.preferredServerAssignor());
assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
index 67cfb756612..14d6442057c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
@@ -31,6 +31,7 @@ public class ShareGroupBuilder {
private final String groupId;
private final int groupEpoch;
private int assignmentEpoch;
+ private long assignmentTimestamp;
private final Map<String, ShareGroupMember> members = new HashMap<>();
private final Map<String, Assignment> assignments = new HashMap<>();
private long metadataHash = 0L;
@@ -39,6 +40,7 @@ public class ShareGroupBuilder {
this.groupId = groupId;
this.groupEpoch = groupEpoch;
this.assignmentEpoch = 0;
+ this.assignmentTimestamp = 0L;
}
public ShareGroupBuilder withMember(ShareGroupMember member) {
@@ -61,6 +63,11 @@ public class ShareGroupBuilder {
return this;
}
+ public ShareGroupBuilder withAssignmentTimestamp(long assignmentTimestamp)
{
+ this.assignmentTimestamp = assignmentTimestamp;
+ return this;
+ }
+
public List<CoordinatorRecord> build() {
List<CoordinatorRecord> records = new ArrayList<>();
@@ -78,7 +85,7 @@ public class ShareGroupBuilder {
);
// Add target assignment epoch.
-
records.add(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId,
assignmentEpoch));
+
records.add(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
assignmentEpoch, assignmentTimestamp));
// Add current assignment records for members.
members.forEach((memberId, member) ->
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
index 6a6f963ce70..60daaa5d1bc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
@@ -406,7 +406,7 @@ public class ShareGroupTest {
assertEquals(ShareGroupState.STABLE, shareGroup.state());
assertThrows(GroupNotEmptyException.class,
shareGroup::validateDeleteGroup);
- shareGroup.setTargetAssignmentEpoch(1);
+ shareGroup.setTargetAssignmentMetadata(1, 12345L);
assertEquals(ShareGroupState.STABLE, shareGroup.state());
assertThrows(GroupNotEmptyException.class,
shareGroup::validateDeleteGroup);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 4a5bd69c25b..bf5b1403e87 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -381,12 +381,13 @@ class StreamsCoordinatorRecordHelpersTest {
.setGroupId(GROUP_ID),
new ApiMessageAndVersion(
new StreamsGroupTargetAssignmentMetadataValue()
- .setAssignmentEpoch(42),
+ .setAssignmentEpoch(42)
+ .setAssignmentTimestamp(12345L),
(short) 0
)
);
- assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(GROUP_ID,
42));
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(GROUP_ID,
42, 12345L));
}
@Test
@@ -737,7 +738,7 @@ class StreamsCoordinatorRecordHelpersTest {
@Test
public void testNewStreamsGroupTargetAssignmentEpochRecordNullGroupId() {
NullPointerException exception =
assertThrows(NullPointerException.class, () ->
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(null,
1));
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(null,
1, 12345L));
assertEquals("groupId should not be null here",
exception.getMessage());
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index aa2f75ec846..5d3688693be 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -31,6 +31,7 @@ public class StreamsGroupBuilder {
private final String groupId;
private final int groupEpoch;
private int targetAssignmentEpoch;
+ private long targetAssignmentTimestamp;
private StreamsTopology topology;
private final Map<String, StreamsGroupMember> members = new HashMap<>();
private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
@@ -42,6 +43,7 @@ public class StreamsGroupBuilder {
this.groupId = groupId;
this.groupEpoch = groupEpoch;
this.targetAssignmentEpoch = 0;
+ this.targetAssignmentTimestamp = 0L;
this.topology = null;
}
@@ -75,6 +77,11 @@ public class StreamsGroupBuilder {
return this;
}
+ public StreamsGroupBuilder withTargetAssignmentTimestamp(long
targetAssignmentTimestamp) {
+ this.targetAssignmentTimestamp = targetAssignmentTimestamp;
+ return this;
+ }
+
public StreamsGroupBuilder withLastAssignmentConfigs(Map<String, String>
lastAssignmentConfigs) {
this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
return this;
@@ -110,8 +117,8 @@ public class StreamsGroupBuilder {
}
// Add target assignment epoch.
-
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
- targetAssignmentEpoch));
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
+ targetAssignmentEpoch, targetAssignmentTimestamp));
// Add current assignment records for members.
members.forEach((memberId, member) ->
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 3a144d00dda..d52c5e631f1 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -518,7 +518,7 @@ public class StreamsGroupTest {
assertEquals(MemberState.STABLE, member2.state());
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
streamsGroup.state());
- streamsGroup.setTargetAssignmentEpoch(2);
+ streamsGroup.setTargetAssignmentMetadata(2, 12345L);
assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
streamsGroup.state());
@@ -802,7 +802,7 @@ public class StreamsGroupTest {
group.setTopology(new StreamsTopology(1, Map.of()));
group.setValidatedTopologyEpoch(1);
group.setConfiguredTopology(new ConfiguredTopology(1, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
- group.setTargetAssignmentEpoch(1);
+ group.setTargetAssignmentMetadata(1, 12345L);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)
.build());
@@ -877,7 +877,7 @@ public class StreamsGroupTest {
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
streamsGroup.state());
assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
- streamsGroup.setTargetAssignmentEpoch(1);
+ streamsGroup.setTargetAssignmentMetadata(1, 12345L);
assertEquals(StreamsGroup.StreamsGroupState.STABLE,
streamsGroup.state());
assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
@@ -914,7 +914,7 @@ public class StreamsGroupTest {
group.setTopology(new StreamsTopology(1, Map.of()));
group.setConfiguredTopology(new ConfiguredTopology(1, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
group.setValidatedTopologyEpoch(1);
- group.setTargetAssignmentEpoch(1);
+ group.setTargetAssignmentMetadata(1, 12345L);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
@@ -1139,7 +1139,7 @@ public class StreamsGroupTest {
group.setGroupEpoch(2);
group.setTopology(new StreamsTopology(2, subtopologies));
- group.setTargetAssignmentEpoch(2);
+ group.setTargetAssignmentMetadata(2, 12345L);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(2)
.setPreviousMemberEpoch(1)
@@ -1198,7 +1198,7 @@ public class StreamsGroupTest {
group.setGroupEpoch(3);
group.setTopology(new StreamsTopology(2, subtopologies));
group.setConfiguredTopology(new ConfiguredTopology(3, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
- group.setTargetAssignmentEpoch(3);
+ group.setTargetAssignmentMetadata(3, 12345L);
snapshotRegistry.idempotentCreateSnapshot(1);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup =
group.asDescribedGroup(1);
@@ -1225,7 +1225,7 @@ public class StreamsGroupTest {
group.setGroupEpoch(4);
group.setTopology(new StreamsTopology(4, subtopologies));
// No ConfiguredTopology set, so should fallback to StreamsTopology
- group.setTargetAssignmentEpoch(4);
+ group.setTargetAssignmentMetadata(4, 12345L);
snapshotRegistry.idempotentCreateSnapshot(1);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup =
group.asDescribedGroup(1);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index 7d8ba55e24e..dcce6b02461 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
@@ -46,7 +47,7 @@ import java.util.TreeMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
-import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
+import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord;
import static
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
@@ -71,12 +72,13 @@ public class TargetAssignmentBuilderTest {
when(topology.isReady()).thenReturn(false);
TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId,
groupEpoch, assignor, assignmentConfigs)
+ .withTime(new MockTime(0, 12345L, 12345L))
.withTopology(topology);
TargetAssignmentBuilder.TargetAssignmentResult result =
builder.build();
List<CoordinatorRecord> expectedRecords = List.of(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
groupEpoch)
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
groupEpoch, 12345L)
);
assertEquals(expectedRecords, result.records());
@@ -124,13 +126,15 @@ public class TargetAssignmentBuilderTest {
public void testEmpty() {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = context.build();
- assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord(
+ assertEquals(List.of(newStreamsGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)), result.records());
assertEquals(Map.of(), result.targetAssignment());
}
@@ -141,7 +145,8 @@ public class TargetAssignmentBuilderTest {
public void testAssignmentHasNotChanged(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -169,9 +174,10 @@ public class TargetAssignmentBuilderTest {
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = context.build();
- assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord(
+ assertEquals(List.of(newStreamsGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
)), result.records());
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -193,7 +199,8 @@ public class TargetAssignmentBuilderTest {
public void testAssignmentSwapped(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -234,9 +241,10 @@ public class TargetAssignmentBuilderTest {
))
)), result.records().subList(0, 2));
- assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
), result.records().get(2));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -258,7 +266,8 @@ public class TargetAssignmentBuilderTest {
public void testNewMember(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -310,9 +319,10 @@ public class TargetAssignmentBuilderTest {
))
)), result.records().subList(0, 3));
- assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
), result.records().get(3));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -338,7 +348,8 @@ public class TargetAssignmentBuilderTest {
public void testUpdateMember(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -398,9 +409,10 @@ public class TargetAssignmentBuilderTest {
))
)), result.records().subList(0, 3));
- assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
), result.records().get(3));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -426,7 +438,8 @@ public class TargetAssignmentBuilderTest {
public void testPartialAssignmentUpdate(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -478,9 +491,10 @@ public class TargetAssignmentBuilderTest {
))
)), result.records().subList(0, 2));
- assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
), result.records().get(2));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -506,7 +520,8 @@ public class TargetAssignmentBuilderTest {
public void testDeleteMember(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -554,9 +569,10 @@ public class TargetAssignmentBuilderTest {
))
)), result.records().subList(0, 2));
- assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
), result.records().get(2));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -578,7 +594,8 @@ public class TargetAssignmentBuilderTest {
public void testReplaceStaticMember(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
"my-group",
- 20
+ 20,
+ 12345L
);
String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -632,9 +649,10 @@ public class TargetAssignmentBuilderTest {
))
)), result.records().subList(0, 1));
- assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
"my-group",
- 20
+ 20,
+ 12345L
), result.records().get(1));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -659,6 +677,7 @@ public class TargetAssignmentBuilderTest {
private final String groupId;
private final int groupEpoch;
+ private final long assignmentTimestamp;
private final TaskAssignor assignor = mock(TaskAssignor.class);
private final SortedMap<String, ConfiguredSubtopology> subtopologies =
new TreeMap<>();
private final ConfiguredTopology topology = new ConfiguredTopology(0,
0, Optional.of(subtopologies), new HashMap<>(),
@@ -673,10 +692,12 @@ public class TargetAssignmentBuilderTest {
public TargetAssignmentBuilderTestContext(
String groupId,
- int groupEpoch
+ int groupEpoch,
+ long assignmentTimestamp
) {
this.groupId = groupId;
this.groupEpoch = groupEpoch;
+ this.assignmentTimestamp = assignmentTimestamp;
}
public void addGroupMember(
@@ -834,7 +855,9 @@ public class TargetAssignmentBuilderTest {
});
// Execute the builder.
-
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = builder.build();
+
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = builder
+ .withTime(new MockTime(0, assignmentTimestamp,
assignmentTimestamp))
+ .build();
// Verify that the assignor was called once with the expected
// assignment spec.