Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-11 Thread via GitHub


dajac merged PR #14467:
URL: https://github.com/apache/kafka/pull/14467


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-11 Thread via GitHub


jeffkbkim commented on PR #14467:
URL: https://github.com/apache/kafka/pull/14467#issuecomment-1758248999

   created https://issues.apache.org/jira/browse/KAFKA-15587


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-11 Thread via GitHub


dajac commented on PR #14467:
URL: https://github.com/apache/kafka/pull/14467#issuecomment-1758200615

   @jeffkbkim Could you please file a JIRA for the max record size case? We 
will need to find a solution for this at some point.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-11 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1355212713


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestampMs = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!group.isSubscribedToTopic(topic)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestampMs, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records).toString());
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+
+if (!expiredPartitions.isEmpty()) {
+log.info("[GroupId {}] Expiring offsets of partitions 
(hasAllOffsetsExpired={}): {}",

Review Comment:
   1. Yes, we don't delete the group until all offsets are gone. If this 
iteration was successful we'll have removed the offsets that we generated 
tombstone records for.
   2. I think the current structure is fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353833261


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>
+// - If current state timestamp exists and retention period 
has passed since group became Empty,
+//   expire all offsets with no pending offset commit;
+// - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+//   since the last commit timestamp, expire the offset
+return Optional.of(new OffsetExpirationConditionImpl(
+offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs))
+);
+} else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+// Consumers exist in the group and group is Stable =>
+// - If the group is aware of the subscribed topics and 
retention period had passed since the
+//   last commit timestamp, expire the offset. offset with 
pending offset commit are not
+//   expired
+return Optional.of(new 
OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs));
+}
+} else {
+// protocolType is None => standalone (simple) consumer, that uses 
Kafka for offset storage only
+// expire offsets where retention period has passed since their 
last commit

Review Comment:
   nit: periods and capitalization



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353832984


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>
+// - If current state timestamp exists and retention period 
has passed since group became Empty,
+//   expire all offsets with no pending offset commit;
+// - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+//   since the last commit timestamp, expire the offset
+return Optional.of(new OffsetExpirationConditionImpl(
+offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs))
+);
+} else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+// Consumers exist in the group and group is Stable =>
+// - If the group is aware of the subscribed topics and 
retention period had passed since the
+//   last commit timestamp, expire the offset. offset with 
pending offset commit are not

Review Comment:
   nit: Offsets* with pending o...are not expired.* 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353832660


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>
+// - If current state timestamp exists and retention period 
has passed since group became Empty,
+//   expire all offsets with no pending offset commit;
+// - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+//   since the last commit timestamp, expire the offset
+return Optional.of(new OffsetExpirationConditionImpl(
+offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs))
+);
+} else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+// Consumers exist in the group and group is Stable =>
+// - If the group is aware of the subscribed topics and 
retention period had passed since the

Review Comment:
   nit: has*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353832416


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>

Review Comment:
   nit: No consumers* exist* in the group, also do we wanna name it members to 
be consistent with the rest of the code terminology? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353831509


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.

Review Comment:
   nit: if*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353831027


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestampMs = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!group.isSubscribedToTopic(topic)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestampMs, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records).toString());
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+
+if (!expiredPartitions.isEmpty()) {
+log.info("[GroupId {}] Expiring offsets of partitions 
(hasAllOffsetsExpired={}): {}",

Review Comment:
   The placement of hasAllOffsetsExpired seems a bit off for the logging 
message `[GroupId 12345] Expiring offsets of partitions 
(hasAllOffsetsExpired=false): partition1, partition2, partition3` This is how 
it would look right? Can we move it to the end of the list or the beginning of 
this line if you think it makes sense as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353831281


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -635,6 +639,21 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return state() == ConsumerGroupState.EMPTY;
+}
+
+/**
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.

Review Comment:
   nit: if*
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353830271


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestampMs = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!group.isSubscribedToTopic(topic)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestampMs, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records).toString());
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+
+if (!expiredPartitions.isEmpty()) {
+log.info("[GroupId {}] Expiring offsets of partitions 
(hasAllOffsetsExpired={}): {}",

Review Comment:
   Correct me if I'm wrong this is just for my understanding, so unless all the 
offsets are expired we don't delete the group right? In cases where 
hasAllOffsetsExpired is false and expiredPartitions is non-empty, the group 
won't be deleted but the tombstone records will be appended? And the next time 
we iterate through the partitions the ones with the tombstone record aren't 
included right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353823509


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestampMs = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);

Review Comment:
   nit: haveAllOffsetsExpired



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353679164


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.

Review Comment:
   yes
   
   ```
   groupMetadataManager.groupIds().forEach(groupId -> {
   if (offsetMetadataManager.cleanupExpiredOffsets(groupId, 
records)) {
   groupMetadataManager.maybeDeleteGroup(groupId, records);
   }
   });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353056783


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.

Review Comment:
   Is this method only called after the groupId is verified to exist?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353055183


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.

Review Comment:
   nit: the group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353053689


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import java.util.function.Function;
+
+public class OffsetExpirationConditionImpl implements 
OffsetExpirationCondition {
+
+/**
+ * Given an offset and metadata, obtain the base timestamp that should be 
used
+ * as the start of the offsets retention period.
+ */
+private final Function baseTimestamp;
+
+public OffsetExpirationConditionImpl(Function 
baseTimestamp) {
+this.baseTimestamp = baseTimestamp;
+}
+
+/**
+ * Determine whether an offset is expired. Older versions have an expire 
timestamp per partition. If this
+ * exists, compare against the current timestamp. Otherwise, use the base 
timestamp (either commit timestamp
+ * or current state timestamp if group is empty for generic groups) and 
check whether the offset has
+ * exceeded the offset retention.
+ *
+ * @param offset  The offset and metadata.
+ * @param currentTimestampMs  The current timestamp.
+ * @param offsetsRetentionMs  The offsets retention in milliseconds.
+ *
+ * @return Whether the given offset is expired or not.
+ */
+@Override
+public boolean isOffsetExpired(OffsetAndMetadata offset, long 
currentTimestampMs, long offsetsRetentionMs) {
+if (offset.expireTimestampMs.isPresent()) {
+// Older versions with explicit expire_timestamp field => old 
expiration semantics is used
+return currentTimestampMs >= offset.expireTimestampMs.getAsLong();
+} else {
+// Current version with no per partition retention

Review Comment:
   nit: missing period



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353048258


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -417,6 +441,39 @@ public CoordinatorResult 
deleteOffsets(
 return offsetMetadataManager.deleteOffsets(request);
 }
 
+/**
+ * For each group, remove all expired offsets. If all offsets for the 
group is removed and the group is eligible

Review Comment:
   nit: offsets for the group are* removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353045452


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -91,6 +91,27 @@ public class GroupCoordinatorConfig {
  */
 public final int genericGroupMaxSessionTimeoutMs;
 
+/**
+ * Frequency at which to check for expired offsets.
+ */
+public final long offsetsRetentionCheckIntervalMs;
+
+/**
+ * For subscribed consumers, committed offset of a specific partition will 
be expired and discarded when
+ * 1) this retention period has elapsed after the consumer group loses 
all its consumers (i.e. becomes empty);
+ * 2) this retention period has elapsed since the last time an offset 
is committed for the partition AND
+ *the group is no longer subscribed to the corresponding topic.
+ *
+ * For standalone consumers (using manual assignment), offsets will be 
expired after this retention period has
+ * elapsed since the time of last commit.
+ *
+ * Note that when a group is deleted via the DeleteGroups request, its 
committed offsets will also be deleted immediately;
+ *
+ * Also, when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's
+ * committed offsets for that topic will also be deleted without extra 
retention period

Review Comment:
   nit: missing period



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -91,6 +91,27 @@ public class GroupCoordinatorConfig {
  */
 public final int genericGroupMaxSessionTimeoutMs;
 
+/**
+ * Frequency at which to check for expired offsets.
+ */
+public final long offsetsRetentionCheckIntervalMs;
+
+/**
+ * For subscribed consumers, committed offset of a specific partition will 
be expired and discarded when
+ * 1) this retention period has elapsed after the consumer group loses 
all its consumers (i.e. becomes empty);
+ * 2) this retention period has elapsed since the last time an offset 
is committed for the partition AND
+ *the group is no longer subscribed to the corresponding topic.
+ *
+ * For standalone consumers (using manual assignment), offsets will be 
expired after this retention period has
+ * elapsed since the time of last commit.
+ *
+ * Note that when a group is deleted via the DeleteGroups request, its 
committed offsets will also be deleted immediately;
+ *
+ * Also, when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's

Review Comment:
   nit: missing period



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353045293


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -117,4 +119,16 @@ void validateOffsetFetch(
  * @param records The list of records.
  */
 void createGroupTombstoneRecords(List records);
+
+/**
+ * @return Whether the group can be deleted or not.
+ */
+boolean isEmpty();

Review Comment:
   We should fix the javadoc. Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353045205


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -91,6 +91,27 @@ public class GroupCoordinatorConfig {
  */
 public final int genericGroupMaxSessionTimeoutMs;
 
+/**
+ * Frequency at which to check for expired offsets.
+ */
+public final long offsetsRetentionCheckIntervalMs;
+
+/**
+ * For subscribed consumers, committed offset of a specific partition will 
be expired and discarded when
+ * 1) this retention period has elapsed after the consumer group loses 
all its consumers (i.e. becomes empty);
+ * 2) this retention period has elapsed since the last time an offset 
is committed for the partition AND
+ *the group is no longer subscribed to the corresponding topic.
+ *
+ * For standalone consumers (using manual assignment), offsets will be 
expired after this retention period has
+ * elapsed since the time of last commit.
+ *
+ * Note that when a group is deleted via the DeleteGroups request, its 
committed offsets will also be deleted immediately;
+ *
+ * Also, when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's

Review Comment:
   nit: missing period



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353044716


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -91,6 +91,27 @@ public class GroupCoordinatorConfig {
  */
 public final int genericGroupMaxSessionTimeoutMs;
 
+/**
+ * Frequency at which to check for expired offsets.
+ */
+public final long offsetsRetentionCheckIntervalMs;
+
+/**
+ * For subscribed consumers, committed offset of a specific partition will 
be expired and discarded when

Review Comment:
   nit: can we put a colon after the when and capitalize the T in this for the 
bullet points?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


rreddy-22 commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1353043290


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -117,4 +119,16 @@ void validateOffsetFetch(
  * @param records The list of records.
  */
 void createGroupTombstoneRecords(List records);
+
+/**
+ * @return Whether the group can be deleted or not.
+ */
+boolean isEmpty();

Review Comment:
   the return value seems more like a use case right? Should we update the name 
of the method or the return statement and add the "whether group can be deleted 
or not" part as a use case in the javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1352815096


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,46 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>
+// - If current state timestamp exists and retention period 
has passed since group became Empty,
+//   expire all offsets with no pending offset commit;
+// - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+//   since the last commit timestamp, expire the offset
+return Optional.of(new OffsetExpirationConditionImpl(
+offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs)));
+} else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+// Consumers exist in the group and group is Stable =>
+// - If the group is aware of the subscribed topics and 
retention period had passed since the
+//   last commit timestamp, expire the offset. offset with 
pending offset commit are not
+//   expired
+return Optional.of(new 
OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs));

Review Comment:
   no, commitTimestampMs is a field and not a method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-10 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1351957160


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -106,7 +107,8 @@ void validateOffsetFetch(
 /**
  * Returns true if the group is actively subscribed to the topic.
  *
- * @param topic The topic name.
+ * @param topicThe topic name.

Review Comment:
   nit: We can remove a few tabs here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -349,9 +351,11 @@ public Set subscribedTopicNames() {
 /**
  * Returns true if the consumer group is actively subscribed to the topic.
  *
- * @param topic The topic name.
- * @return whether the group is subscribed to the topic.
+ * @param topicThe topic name.

Review Comment:
   nit: Alignment.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import java.util.function.Function;
+
+public class OffsetExpirationConditionImpl implements 
OffsetExpirationCondition {
+
+/**
+ * Given an offset and metadata, obtain the base timestamp that should be 
used
+ * as the start of the offsets retention period.
+ */
+private final Function baseTimestamp;
+
+public OffsetExpirationConditionImpl(Function 
baseTimestamp) {
+this.baseTimestamp = baseTimestamp;
+}
+
+/**
+ * Determine whether an offset is expired. Older versions have an expire 
timestamp per partition. If this
+ * exists, compare against the current timestamp. Otherwise, use the base 
timestamp (either commit timestamp
+ * or current state timestamp if group is empty for generic groups) and 
check whether the offset has
+ * exceeded the offset retention.
+ *
+ * @param offset  The offset and metadata.
+ * @param currentTimestampMsThe current timestamp.

Review Comment:
   nit: Alignment.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -1063,16 +1105,17 @@ public Optional> subscribedTopics() {
 }
 
 /**
- * Returns true if the consumer group is actively subscribed to the topic. 
When the consumer
+ * Returns true if the consumer group is actively subscribed to the topic. 
When the generic
  * group does not know, because the information is not available yet or 
because it has
- * failed to parse the Consumer Protocol, it returns true to be safe.
+ * failed to parse the Consumer Protocol, it returns whether the group is 
using the consumer protocol.
+ *
+ * @param topicThe topic name.

Review Comment:
   nit: Alignment.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -1063,16 +1105,17 @@ public Optional> subscribedTopics() {
 }
 
 /**
- * Returns true if the consumer group is actively subscribed to the topic. 
When the consumer
+ * Returns true if the consumer group is actively subscribed to the topic. 
When the generic
  * group does not know, because the information is not available yet or 
because it has
- * failed to parse the Consumer Protocol, it returns true to be safe.
+ * failed to parse the Consumer Protocol, it returns whether the group is 
using the consumer protocol.

Review Comment:
   nit: `it returns whether the group is using the consumer protocol.` It would 
be great to expand a little more on this. We basically want to be conservative 
for consumer groups but not for other types, etc.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,46 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEmpty() {
+ 

Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1350863763


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   > In this case, isn't subscribedTopics going to be set to an optional 
containing an empty list?
   ah.. i misread and thought subscribedTopics becomes an empty optional, not 
an empty set. So we would return `false` which is the correct behavior.
   
   > Did you do it because it is correct or by mistake?
   which change, to add another argument? I added that so we can rely on using 
`isSubscribedToTopic` instead of passing in the subscribed topics set.
   
   i will update with your suggestion, it makes sense to me.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   > In this case, isn't subscribedTopics going to be set to an optional 
containing an empty list?
   
   ah.. i misread and thought subscribedTopics becomes an empty optional, not 
an empty set. So we would return `false` which is the correct behavior.
   
   > Did you do it because it is correct or by mistake?
   
   which change, to add another argument? I added that so we can rely on using 
`isSubscribedToTopic` instead of passing in the subscribed topics set.
   
   i will update with your suggestion, it makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1350811243


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * An offset is considered expired based on different factors, such as the 
state of the group
+ * and/or the GroupMetadata record version (for generic groups). This class is 
used to check
+ * how offsets for the group should be expired.
+ */
+public interface OffsetExpirationCondition {
+
+/**
+ * Given an offset metadata and offsets retention, return whether the 
offset is expired or not.
+ *
+ * @param offset   The offset metadata.
+ * @param currentTimestamp The current timestamp.
+ * @param offsetsRetentionMs   The offset retention.
+ *
+ * @return Whether the offset is considered expired or not.
+ */
+boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestamp, 
long offsetsRetentionMs);
+}

Review Comment:
   misunderstood. will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-09 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1350279208


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * An offset is considered expired based on different factors, such as the 
state of the group
+ * and/or the GroupMetadata record version (for generic groups). This class is 
used to check
+ * how offsets for the group should be expired.
+ */
+public interface OffsetExpirationCondition {
+
+/**
+ * Given an offset metadata and offsets retention, return whether the 
offset is expired or not.
+ *
+ * @param offset   The offset metadata.
+ * @param currentTimestamp The current timestamp.
+ * @param offsetsRetentionMs   The offset retention.
+ *
+ * @return Whether the offset is considered expired or not.
+ */
+boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestamp, 
long offsetsRetentionMs);
+}

Review Comment:
   This was closed but not addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-09 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1350261943


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   I am also confused by the fact that you did the change. Did you do it 
because it is correct or by mistake?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-09 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1350261109


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   Hum.. I am not sure to follow. In this case, isn't `subscribedTopics` going 
to be set to an optional containing an empty list?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349811549


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -106,15 +107,29 @@ void validateOffsetFetch(
 /**
  * Returns true if the group is actively subscribed to the topic.
  *
- * @param topic The topic name.
+ * @param topicThe topic name.
+ * @param isSubscribedIfEmptySubscriptions Whether to consider an empty 
topic subscriptions subscribed or not.
+ *
  * @return Whether the group is subscribed to the topic.
  */
-boolean isSubscribedToTopic(String topic);
+boolean isSubscribedToTopic(String topic, boolean 
isSubscribedIfEmptySubscriptions);

Review Comment:
   responded in the thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349811501


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   I think the main issue is that the existing behavior in 
GroupMetadata#removeExpiredOffsets only considers what topics a group is 
subscribed to if the group is using the consumer group protocol AND is Stable. 
If a group is in any other state, it acts as if the group is not subscribed to 
any topic when expiring offsets.
   
   here's my concern with the above suggestion:
   
   let's say we have an empty group that uses the consumer group protocol. 
subscribedTopics will be empty as there are no members (set in 
`computeSubscribedTopics`). This will return `true` from `isSubscribedToTopic`. 
This is not aligned with the existing behavior which says if a group is empty 
and has a protocol type, we return an empty collection so that the group is 
considered not subscribed to any topics during offset expiration.
   ```
 case Some(_) if is(Empty) =>
   // no consumer exists in the group =>
   ...
   getExpiredOffsets(
 commitRecordMetadataAndOffset => currentStateTimestamp
   
.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp), 

   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349654092


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -106,15 +107,29 @@ void validateOffsetFetch(
 /**
  * Returns true if the group is actively subscribed to the topic.
  *
- * @param topic The topic name.
+ * @param topicThe topic name.
+ * @param isSubscribedIfEmptySubscriptions Whether to consider an empty 
topic subscriptions subscribed or not.
+ *
  * @return Whether the group is subscribed to the topic.
  */
-boolean isSubscribedToTopic(String topic);
+boolean isSubscribedToTopic(String topic, boolean 
isSubscribedIfEmptySubscriptions);

Review Comment:
   I made a comment about this boolean 
[here](https://github.com/apache/kafka/pull/14467#discussion_r1348539504). 
Could you take a look?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653948


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +579,100 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!group.isSubscribedToTopic(topic, false)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records));
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+
+log.debug("[GroupId {}] Expiring offsets: {}", groupId, 
expiredPartitions);

Review Comment:
   I meant that we need to format the list to a comma separated string. Could 
we also log the boolean indicating whether all the offsets get expired or not.
   
   Should we also say `Expiring offsets of partitions: `?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653647


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,76 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();

Review Comment:
   nit: currentTimestampMs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653472


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * An offset is considered expired based on different factors, such as the 
state of the group
+ * and/or the GroupMetadata record version (for generic groups). This class is 
used to check
+ * how offsets for the group should be expired.
+ */
+public interface OffsetExpirationCondition {
+
+/**
+ * Given an offset metadata and offsets retention, return whether the 
offset is expired or not.
+ *
+ * @param offset   The offset metadata.
+ * @param currentTimestamp The current timestamp.
+ * @param offsetsRetentionMs   The offset retention.
+ *
+ * @return Whether the offset is considered expired or not.
+ */
+boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestamp, 
long offsetsRetentionMs);
+}

Review Comment:
   nit: Let's add a new line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653385


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType 
groupType) {
 assertEquals(3, numDeleteOffsets);
 }
 
+@Test
+public void testIsExpiredOffset() {
+long currentTimestamp = 1000L;
+long baseTimestamp = 500L;
+OptionalLong expireTimestampMs = OptionalLong.of(1500);
+long offsetsRetentionMs = 500L;
+
+// Current timestamp >= expire timestamp => should expire
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Current timestamp < expire timestamp => should not expire
+currentTimestamp = 499;
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Expire timestamp does not exist (current version with no per 
partition retention)
+// Current timestamp - base timestamp >= offsets retention => should 
expire
+expireTimestampMs = OptionalLong.empty();
+currentTimestamp = 1000L;
+assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Current timestamp - base timestamp < offsets retention => should 
not expire
+currentTimestamp = 999L;
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+}
+
+@Test
+public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+.build();
+
+List records = new ArrayList<>();
+assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));

Review Comment:
   I agree in this case. However, if the group does not exist, 
https://github.com/apache/kafka/pull/14467/files#diff-a4ad0e0a77c78dde2a841d055dd64e15f7da00b8a2a9e3279d74b718d5c612bbR568
 should throw an exception. Should we add a test for this case and rename 
`testCleanupExpiredOffsetsGroupDoesNotExist` to something like 
`testCleanupExpiredOffsetsGroupHasNoOffsets`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349386383


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,52 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isGroupEmpty() {
+return isInState(EMPTY);
+}
+
+/**
+ * Return the offset expiration condition to be used for this group. This 
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata 
record version.
+ *
+ * See {@link 
org.apache.kafka.coordinator.group.OffsetMetadataManager.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+ */
+@Override
+public Optional 
offsetExpirationCondition() {
+if (protocolType.isPresent()) {
+if (isInState(EMPTY)) {
+// No consumer exists in the group =>
+// - If current state timestamp exists and retention period 
has passed since group became Empty,
+//   expire all offsets with no pending offset commit;
+// - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+//   since the last commit timestamp, expire the offset
+return Optional.of(
+(offsetAndMetadata, currentTimestamp, offsetsRetentionMs) 
-> OffsetMetadataManager.isExpiredOffset(
+currentTimestamp,
+
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs),
+offsetAndMetadata.expireTimestampMs,
+offsetsRetentionMs
+)

Review Comment:
   Thanks for the suggestion! I moved OffsetExpirationCondition to a separate 
file along with a new OffsetExpirationConditionImpl class. Let me know how this 
is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349382254


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +579,100 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!group.isSubscribedToTopic(topic, false)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records));
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+
+log.debug("[GroupId {}] Expiring offsets: {}", groupId, 
expiredPartitions);

Review Comment:
   changed to info and to only log when there are expired partitions.
   
   What do you mean by format, as in just log the size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349341227


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType 
groupType) {
 assertEquals(3, numDeleteOffsets);
 }
 
+@Test
+public void testIsExpiredOffset() {
+long currentTimestamp = 1000L;
+long baseTimestamp = 500L;
+OptionalLong expireTimestampMs = OptionalLong.of(1500);
+long offsetsRetentionMs = 500L;
+
+// Current timestamp >= expire timestamp => should expire
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Current timestamp < expire timestamp => should not expire
+currentTimestamp = 499;
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Expire timestamp does not exist (current version with no per 
partition retention)
+// Current timestamp - base timestamp >= offsets retention => should 
expire
+expireTimestampMs = OptionalLong.empty();
+currentTimestamp = 1000L;
+assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Current timestamp - base timestamp < offsets retention => should 
not expire
+currentTimestamp = 999L;
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+}
+
+@Test
+public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+.build();
+
+List records = new ArrayList<>();
+assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));
+assertEquals(Collections.emptyList(), records);
+}
+
+@Test
+public void testCleanupExpiredOffsetsGroupEmptyOffsetExpirationCondition() 
{
+GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+Group group = mock(Group.class);
+
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+.withGroupMetadataManager(groupMetadataManager)
+.build();
+
+context.commitOffset("group-id", "topic", 0, 100L, 0);
+
+when(groupMetadataManager.group("group-id")).thenReturn(group);
+when(group.offsetExpirationCondition()).thenReturn(Optional.empty());
+
+List records = new ArrayList<>();
+assertFalse(context.cleanupExpiredOffsets("group-id", records));
+assertEquals(Collections.emptyList(), records);
+}
+
+@Test
+public void testCleanupExpiredOffsets() {

Review Comment:
   that's correct, one topic partition that the group is still subcribed to, 
one that has expired, and one that has not yet expired.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349340087


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType 
groupType) {
 assertEquals(3, numDeleteOffsets);
 }
 
+@Test
+public void testIsExpiredOffset() {
+long currentTimestamp = 1000L;
+long baseTimestamp = 500L;
+OptionalLong expireTimestampMs = OptionalLong.of(1500);
+long offsetsRetentionMs = 500L;
+
+// Current timestamp >= expire timestamp => should expire
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Current timestamp < expire timestamp => should not expire
+currentTimestamp = 499;
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Expire timestamp does not exist (current version with no per 
partition retention)
+// Current timestamp - base timestamp >= offsets retention => should 
expire
+expireTimestampMs = OptionalLong.empty();
+currentTimestamp = 1000L;
+assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Current timestamp - base timestamp < offsets retention => should 
not expire
+currentTimestamp = 999L;
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+}
+
+@Test
+public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+.build();
+
+List records = new ArrayList<>();
+assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));

Review Comment:
   i think returning `true` makes more sense here because the return value 
specifies whether the group does not have any offsets remaining. If the group 
does not exist in the offsets map, we know the group can be deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348814529


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +904,53 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEligibleForDeletion() {
+return isInState(EMPTY) && generationId > 0;

Review Comment:
   ah got it. makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-06 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348548710


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##
@@ -623,4 +674,33 @@ public void testReplayGroupMetadataWithNullValue() {
 
 verify(groupMetadataManager, times(1)).replay(key, null);
 }
+
+@Test
+public void testScheduleCleanupGroupMetadata() {

Review Comment:
   Do we also need a test which verifies that the implementation does the right 
steps?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +579,100 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!group.isSubscribedToTopic(topic, false)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records));
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+
+log.debug("[GroupId {}] Expiring offsets: {}", groupId, 
expiredPartitions);

Review Comment:
   I wonder if we should log this with info level. What do you think? We should 
also format `expiredPartitions` correctly here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -417,6 +441,40 @@ public CoordinatorResult 
deleteOffsets(
 return offsetMetadataManager.deleteOffsets(request);
 }
 
+/**
+ * For each group, remove all expired offsets. If all offsets for the 
group is removed and the group is eligible
+ * for deletion, delete the group.
+ *
+ * @return The list of tombstones (offset commit and group metadata) to 
append.
+ */
+public CoordinatorResult cleanupGroupMetadata() {
+List records = new ArrayList<>();
+groupMetadataManager.groupIds()
+.forEach(groupId -> {

Review Comment:
   nit: Let's bring `forEach` on the previous line.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +900,52 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isGroupEmpty() {

Review Comment:
   nit: `isEmpty`?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -282,6 +311,17 @@ public void commitOffset(
 int partition,
 long offset,
 int leaderEpoch
+) {
+commitOffset(groupId, topic, partition, offset, leaderEpoch, 
time.milliseconds());
+
+}
+public void commitOffset(

Review Comment:
   nit: Let's add an empty line before the method declaration.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -1085,6 +1093,101 @@ public void testValidateDeleteGroup() {
 assertThrows(GroupIdNotFoundException.class, 
group::validateDeleteGroup);
 }
 
+@Test
+public void testOffsetExpirationCondition() {
+MockedStatic offsetMetadataManager = 
mockStatic(OffsetMetadataManager.class);
+long currentTimestamp = 3L;
+long commitTimestamp = 2L;
+long offsetsRetentionMs = 1L;
+OptionalLong expireTimestamp = Opt

Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-06 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348542303


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +904,53 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEligibleForDeletion() {
+return isInState(EMPTY) && generationId > 0;

Review Comment:
   Probably not because we don't create a record for those groups either. In 
our case, the issue is that the group won't be remove from the cache if we 
don't have a tombstone for it because we update the cache when we replay 
records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-06 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348539504


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   Hum... Please help me to better understand this one. My understanding is 
that defaulting to `true` when the subscribed topics is not defined causes an 
issue for the generic group not using the consumer protocol type. Did I get 
this right?
   
   If my understanding is correct, I wonder if we could actually change the 
implementation of `GenericGroup#isSubscribedToTopic` to something like this:
   
   ```
   public boolean isSubscribedToTopic(String topic) {
   return subscribedTopics.map(topics -> topics.contains(topic))
   .orElse(usesConsumerGroupProtocol());
   }
   ```
   
   The idea is to only be conservative if the consumer protocol type is used. 
Would something like this work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-05 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348205433


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   i updated the code so that we pass in a boolean to `isSubscribedToTopic` to 
specify whether we want an empty subscribedTopics to be considered subscribed 
or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-05 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348205086


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+OptionalLong expireTimestampMs = 
offsetAndMetadata.expireTimestampMs;
+if (expireTimestampMs.isPresent()) {
+// Older versions with explicit expire_timestamp 
field => old expiration semantics is used
+if (currentTimestamp >= 
expireTimestampMs.getAsLong()) {
+
expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, 
records));
+}

Review Comment:
   thanks for the suggestion. looks much simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-05 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348199189


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -898,6 +904,53 @@ public void createGroupTombstoneRecords(List 
records) {
 records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
 }
 
+@Override
+public boolean isEligibleForDeletion() {
+return isInState(EMPTY) && generationId > 0;

Review Comment:
   that's what i'm confused with; in the existing coordinator, we remove from 
the cache but do not append tombstones. doesn't this mean the empty group will 
be populated in the cache when the coordinator restarts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-05 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348064529


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+OptionalLong expireTimestampMs = 
offsetAndMetadata.expireTimestampMs;
+if (expireTimestampMs.isPresent()) {
+// Older versions with explicit expire_timestamp 
field => old expiration semantics is used
+if (currentTimestamp >= 
expireTimestampMs.getAsLong()) {
+
expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, 
records));
+}
+// Current version with no per partition retention
+} else if (currentTimestamp - 
expirationCondition.baseTimestamp.apply(offsetAndMetadata) >= 
offsetsRetentionMs) {
+
expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, 
records));
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+}
+});
+log.debug("[GroupId {}] Expiring offsets: {}", groupId, 
expiredPartitions);
+
+if (hasAllOffsetsExpired.get()) {
+// All offsets were expired for this group. Remove the group.
+return Optional.of(groupId);
+}
+
+} catch (GroupIdNotFoundException e) {
+// groups in offsets should exist.
+log.warn("GroupId {} should exist.", groupId);
+}
+
+return Optional.empty();
+}
+
+/**
+ * Add an offset commit tombstone record for the group.
+ *
+ * @param groupId   The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ * @param records   The list of records to append the tombstone.
+ *
+ * @return The topic partition of the corresponding tombstone.
+ */
+private TopicPartition addOffsetCommitTombstone(
+String groupId,
+String topic,
+int partition, 
+List records
+) {
+records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
topic, partition));
+TopicPartition tp = new TopicPartition(topic, partition);
+log.trace("[GroupId {}] Removing expired offset and metadata for {}", 
groupId, tp);

Review Comment:
   The TopicPartition was created and returned in this method so that we can 
log all of the expired partitions at the end



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-05 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348055071


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+OptionalLong expireTimestampMs = 
offsetAndMetadata.expireTimestampMs;
+if (expireTimestampMs.isPresent()) {
+// Older versions with explicit expire_timestamp 
field => old expiration semantics is used
+if (currentTimestamp >= 
expireTimestampMs.getAsLong()) {
+
expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, 
records));
+}

Review Comment:
   Thanks for the catch and the suggestion



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+OptionalLong expireTimestampMs = 
offsetAndMetadata.expireTimestampMs;
+if (expireTimestampMs.isPresent()) {
+// Older versions with explicit expire_timestamp 
field => old expiration semantics is used
+if (currentTimestamp >= 
expireTimestampMs.getAsLong()) {
+
expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, 
records));
+}

Review Comment:
   Makes sense, thanks for the catch and the suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-05 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1347671837


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   I have also thought of using this but the semantics are a bit different. 
From `GenericGroup#isSubscribedToTopic`:
   ```
   /**
* Returns true if the consumer group is actively subscribed to the 
topic. When the generic
* group does not know, because the information is not available yet or 
because it has
* failed to parse the Consumer Protocol, it returns true to be safe.
*
* @param topic The topic name.
* @return whether the group is subscribed to the topic.
*/
   public boolean isSubscribedToTopic(String topic) {
   return subscribedTopics.map(topics -> topics.contains(topic))
   .orElse(true);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-05 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1347292051


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -72,6 +72,7 @@
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.record.BrokerCompressionType;
 import org.apache.kafka.server.util.FutureUtils;
+import org.apache.kafka.server.util.KafkaScheduler;

Review Comment:
   nit: This could be removed.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##
@@ -330,5 +330,5 @@ void onNewMetadataImage(
 /**
  * Shutdown the group coordinator.
  */
-void shutdown();
+void shutdown() throws InterruptedException;

Review Comment:
   nit: Do we still need this?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -417,6 +442,40 @@ public CoordinatorResult 
deleteOffsets(
 return offsetMetadataManager.deleteOffsets(request);
 }
 
+/**
+ * For each group, remove all expired offsets. If all offsets for the 
group is removed and the group is eligible
+ * for deletion, delete the group.
+ *
+ * @return The list of tombstones (offset commit and group metadata) to 
append.
+ */
+public CoordinatorResult cleanupGroupMetadata() {
+List records = new ArrayList<>();
+Set groupsWithEmptyOffsets = new HashSet<>();
+groupMetadataManager.groupIds()
+.forEach(groupId -> 
offsetMetadataManager.cleanupExpiredOffsets(groupId, records, 
config.offsetsRetentionMs)
+.ifPresent(groupsWithEmptyOffsets::add));
+
+groupsWithEmptyOffsets.forEach(groupId -> 
groupMetadataManager.maybeDeleteGroup(groupId, records));

Review Comment:
   Is there a reason why we need to add the group ids to 
groupsWithEmptyOffsets? I wonder if we could directly delete the group after 
deleting the offsets. Is it possible? The advantage is that it groups all the 
tombstone of a group.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+OptionalLong expireTimestampMs = 
offsetAndMetadata.expireTimestampMs;
+if (expireTimestampMs.isPresent()) {
+// Older versions with explicit expire_timestamp 
field => old expiration semantics is used
+if (currentTimestamp >= 
expireTimestampMs.getAsLong()) {
+
expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, 
records));
+}
+// Current version with no per partition retention
+} else if (currentTimestamp - 
expirationCondition.baseTimestamp.apply(offsetAndMetadata) >= 
offsetsRetentionMs) {
+
expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, 
records));
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+}
+});
+log.debug("[GroupId {}] Expiring offsets: {}", groupId, 
expiredPartitions);
+
+if (hasAllOffsetsExpired.get()) {
+// All offsets were expired for this group. Remove the group.
+return Optional.of(groupId);
+}
+
+} catch (GroupIdNotFoundException e) {
+// groups in offsets should exis

Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-04 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1345541333


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -539,26 +564,59 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 }
 
 public void cleanupOffsetMetadata(List records, long 
offsetsRetentionMs) {
-// TODO: get only committed offset groups?
 offsetsByGroup.forEach((groupId, offsetsByTopic) -> {

Review Comment:
   This approach would not catch empty groups without offsets, isn't it? I 
think that we should take an approach similar to what we did for the delete 
group api. We should iterate over all the groups from the group metadata 
manager in the shard. For each group, we could clean the offsets and the remove 
the group if empty.
   
   It is also a but weird to have the offset metadata manager delete groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-04 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1345534442


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -123,50 +124,79 @@ class CoordinatorPartitionWriter[T](
 val maxBatchSize = logConfig.maxMessageSize
 val currentTimeMs = time.milliseconds()
 
-val recordsBuilder = MemoryRecords.builder(
-  ByteBuffer.allocate(math.min(16384, maxBatchSize)),
-  magic,
-  compressionType,
-  TimestampType.CREATE_TIME,
-  0L,
-  maxBatchSize
-)
+var recordsBuilder: MemoryRecordsBuilder = null

Review Comment:
   @jeffkbkim This change cannot be applied to all cases. For all the records 
written by the new protocol, the atomicity matters. I think that we should 
revert this for now and discuss how to address this within the framework.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org