This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6fe1598e6b5 KAFKA-18170: Add scheduled job to snapshot cold share 
partitions. (#19443)
6fe1598e6b5 is described below

commit 6fe1598e6b583938defbdc35ee3f63d108b5febc
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Apr 23 16:22:28 2025 +0530

    KAFKA-18170: Add scheduled job to snapshot cold share partitions. (#19443)
    
    * There could be scenarios where share partition records in
    `__share_group_state` internal topic are not updated for a while
    implying these partitions are basically cold.
    * In this situation, the presence of these holds back the
    pruner from keeping the topic clean and of manageable size.
    * To remedy the situation, we have added a periodic
    `setupSnapshotColdPartitions` in `ShareCoordinatorService` which does a
    writeAll operation on the associated shards in the coordinator and
    forces snapshot creation for any cold partitions. In this way the pruner
    can continue.
    This job has been added as a timer task.
    * A new internal config
    `share.coordinator.cold.partition.snapshot.interval.ms` has been
    introduced to set the period of the job.
    * Any failures are logged and ignored.
    * New tests have been added to verify the feature.
    
    Reviewers: PoAn Yang <[email protected]>, Andrew Schofield 
<[email protected]>
---
 .../coordinator/share/ShareCoordinatorConfig.java  |  14 +-
 .../coordinator/share/ShareCoordinatorService.java |  29 +-
 .../coordinator/share/ShareCoordinatorShard.java   |  52 +++-
 .../kafka/coordinator/share/ShareGroupOffset.java  |  22 +-
 .../share/ShareCoordinatorServiceTest.java         | 111 ++++++-
 .../share/ShareCoordinatorShardTest.java           | 346 +++++++++++++++++++++
 .../share/ShareCoordinatorTestConfig.java          |   1 +
 7 files changed, 556 insertions(+), 19 deletions(-)

diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
index f4a11abea8e..b92947af9cd 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
@@ -76,6 +76,10 @@ public class ShareCoordinatorConfig {
     public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 
1000; // 5 minutes
     public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The 
duration in milliseconds that the share coordinator will wait between pruning 
eligible records in share-group state topic.";
 
+    public static final String COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG = 
"share.coordinator.cold.partition.snapshot.interval.ms";
+    public static final int COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT = 5 * 
60 * 1000; // 5 minutes
+    public static final String COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC = "The 
duration in milliseconds that the share coordinator will wait between force 
snapshotting share partitions which are not being updated.";
+
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, 
STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
STATE_TOPIC_NUM_PARTITIONS_DOC)
         .define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, 
STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, 
STATE_TOPIC_REPLICATION_FACTOR_DOC)
@@ -87,7 +91,8 @@ public class ShareCoordinatorConfig {
         .define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) 
STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, 
STATE_TOPIC_COMPRESSION_CODEC_DOC)
         .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, 
atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
         .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, 
atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
-        .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, 
STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
STATE_TOPIC_PRUNE_INTERVAL_MS_DOC);
+        .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, 
STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
STATE_TOPIC_PRUNE_INTERVAL_MS_DOC)
+        .defineInternal(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, INT, 
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC);
 
     private final int stateTopicNumPartitions;
     private final short stateTopicReplicationFactor;
@@ -100,7 +105,7 @@ public class ShareCoordinatorConfig {
     private final CompressionType compressionType;
     private final int appendLingerMs;
     private final int pruneIntervalMs;
-
+    private final int coldPartitionSnapshotIntervalMs;
 
     public ShareCoordinatorConfig(AbstractConfig config) {
         stateTopicNumPartitions = 
config.getInt(STATE_TOPIC_NUM_PARTITIONS_CONFIG);
@@ -116,6 +121,7 @@ public class ShareCoordinatorConfig {
             .orElse(null);
         appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
         pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
+        coldPartitionSnapshotIntervalMs = 
config.getInt(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG);
         validate();
     }
 
@@ -163,6 +169,10 @@ public class ShareCoordinatorConfig {
         return pruneIntervalMs;
     }
 
+    public int shareCoordinatorColdPartitionSnapshotIntervalMs() {
+        return coldPartitionSnapshotIntervalMs;
+    }
+
     private void validate() {
         Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && 
snapshotUpdateRecordsPerSnapshot <= 500,
             String.format("%s must be between [0, 500]", 
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 8b87aed65ef..321a1e92ac0 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -261,10 +261,15 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
-        setupRecordPruning();
+        setupPeriodicJobs();
         log.info("Startup complete.");
     }
 
+    private void setupPeriodicJobs() {
+        setupRecordPruning();
+        setupSnapshotColdPartitions();
+    }
+
     private void setupRecordPruning() {
         log.info("Scheduling share-group state topic prune job.");
         timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
@@ -341,6 +346,28 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         return fut;
     }
 
+    private void setupSnapshotColdPartitions() {
+        log.info("Scheduling cold share-partition snapshotting.");
+        timer.add(new 
TimerTask(config.shareCoordinatorColdPartitionSnapshotIntervalMs()) {
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = 
runtime.scheduleWriteAllOperation(
+                    "snapshot-cold-partitions",
+                    Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+                    ShareCoordinatorShard::snapshotColdPartitions
+                );
+
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[]{}))
+                    .whenComplete((__, exp) -> {
+                        if (exp != null) {
+                            log.error("Received error while snapshotting cold 
partitions.", exp);
+                        }
+                        setupSnapshotColdPartitions();
+                    });
+            }
+        });
+    }
+
     @Override
     public void shutdown() {
         if (!isActive.compareAndSet(true, false)) {
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 35891bddf8d..d38564fd6f8 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -66,6 +66,7 @@ import org.apache.kafka.timeline.TimelineHashMap;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
@@ -574,6 +575,46 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return new CoordinatorResult<>(List.of(record), responseData);
     }
 
+    /**
+     * Iterates over the soft state to determine the share partitions whose 
last snapshot is
+     * older than the allowed time interval. The candidate share partitions 
are force snapshotted.
+     *
+     * @return A result containing snapshot records, if any, and a void 
response.
+     */
+    public CoordinatorResult<Void, CoordinatorRecord> snapshotColdPartitions() 
{
+        long coldSnapshottedPartitionsCount = shareStateMap.values().stream()
+            .filter(shareGroupOffset -> shareGroupOffset.createTimestamp() - 
shareGroupOffset.writeTimestamp() != 0)
+            .count();
+
+        // If all share partitions are snapshotted, it means that
+        // system is quiet and cold snapshotting will not help much.
+        if (coldSnapshottedPartitionsCount == shareStateMap.size()) {
+            log.debug("All share snapshot records already cold snapshotted, 
skipping.");
+            return new CoordinatorResult<>(List.of(), null);
+        }
+
+        // Some active partitions are there.
+        List<CoordinatorRecord> records = new ArrayList<>();
+
+        shareStateMap.forEach((sharePartitionKey, shareGroupOffset) -> {
+            long timeSinceLastSnapshot = time.milliseconds() - 
shareGroupOffset.writeTimestamp();
+            if (timeSinceLastSnapshot >= 
config.shareCoordinatorColdPartitionSnapshotIntervalMs()) {
+                // We need to force create a snapshot here
+                log.info("Last snapshot for {} is older than allowed 
interval.", sharePartitionKey);
+                
records.add(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+                    sharePartitionKey.groupId(),
+                    sharePartitionKey.topicId(),
+                    sharePartitionKey.partition(),
+                    shareGroupOffset.builderSupplier()
+                        .setSnapshotEpoch(shareGroupOffset.snapshotEpoch() + 
1) // We need to increment by one as this is a new snapshot.
+                        .setWriteTimestamp(time.milliseconds())
+                        .build()
+                ));
+            }
+        });
+        return new CoordinatorResult<>(records, null);
+    }
+
     /**
      * Util method to generate a ShareSnapshot or ShareUpdate type record for 
a key, based on various conditions.
      * <p>
@@ -589,6 +630,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         WriteShareGroupStateRequestData.PartitionData partitionData,
         SharePartitionKey key
     ) {
+        long timestamp = time.milliseconds();
         if (!shareStateMap.containsKey(key)) {
             // Since this is the first time we are getting a write request for 
key, we should be creating a share snapshot record.
             // The incoming partition data could have overlapping state 
batches, we must merge them
@@ -600,8 +642,8 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                     .setLeaderEpoch(partitionData.leaderEpoch())
                     .setStateEpoch(partitionData.stateEpoch())
                     .setStateBatches(mergeBatches(List.of(), partitionData))
-                    .setCreateTimestamp(time.milliseconds())
-                    .setWriteTimestamp(time.milliseconds())
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
                     .build());
         } else if (snapshotUpdateCount.getOrDefault(key, 0) >= 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
             ShareGroupOffset currentState = shareStateMap.get(key); // 
shareStateMap will have the entry as containsKey is true
@@ -620,8 +662,8 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                     .setLeaderEpoch(newLeaderEpoch)
                     .setStateEpoch(newStateEpoch)
                     .setStateBatches(mergeBatches(currentState.stateBatches(), 
partitionData, newStartOffset))
-                    .setCreateTimestamp(time.milliseconds())
-                    .setWriteTimestamp(time.milliseconds())
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
                     .build());
         } else {
             ShareGroupOffset currentState = shareStateMap.get(key); // 
shareStateMap will have the entry as containsKey is true.
@@ -636,8 +678,6 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                     .setStartOffset(partitionData.startOffset())
                     .setLeaderEpoch(partitionData.leaderEpoch())
                     .setStateBatches(mergeBatches(List.of(), partitionData))
-                    .setCreateTimestamp(currentState.createTimestamp())
-                    .setWriteTimestamp(currentState.writeTimestamp())
                     .build());
         }
     }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
index d2a91a5bc7a..c0397a273f4 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
@@ -24,7 +24,6 @@ import 
org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
 import org.apache.kafka.server.share.persister.PersisterStateBatch;
 
 import java.util.Collections;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Objects;
 
@@ -33,8 +32,8 @@ import java.util.Objects;
  * This class is immutable (state batches is not modified out of context).
  */
 public class ShareGroupOffset {
-    public static final int NO_TIMESTAMP = -1;
-    public static final int UNINITIALIZED_EPOCH = -1;
+    public static final int NO_TIMESTAMP = 0;
+    public static final int UNINITIALIZED_EPOCH = 0;
     public static final int DEFAULT_EPOCH = 0;
 
     private final int snapshotEpoch;
@@ -161,10 +160,6 @@ public class ShareGroupOffset {
         );
     }
 
-    public LinkedHashSet<PersisterStateBatch> stateBatchAsSet() {
-        return new LinkedHashSet<>(stateBatches);
-    }
-
     public static class Builder {
         private int snapshotEpoch;
         private int stateEpoch;
@@ -195,7 +190,7 @@ public class ShareGroupOffset {
         }
 
         public Builder setStateBatches(List<PersisterStateBatch> stateBatches) 
{
-            this.stateBatches = stateBatches;
+            this.stateBatches = stateBatches == null ? Collections.emptyList() 
: stateBatches.stream().toList();
             return this;
         }
 
@@ -245,4 +240,15 @@ public class ShareGroupOffset {
             ", stateBatches=" + stateBatches +
             '}';
     }
+
+    public Builder builderSupplier() {
+        return new Builder()
+            .setSnapshotEpoch(snapshotEpoch())
+            .setStateEpoch(stateEpoch())
+            .setLeaderEpoch(leaderEpoch())
+            .setStartOffset(startOffset())
+            .setStateBatches(stateBatches())
+            .setCreateTimestamp(createTimestamp())
+            .setWriteTimestamp(writeTimestamp());
+    }
 }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 399643e32a9..a4b316d9443 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -1458,7 +1458,7 @@ class ShareCoordinatorServiceTest {
                 any(),
                 any());
 
-        timer.advanceClock(30005L); // prune should be called
+        timer.advanceClock(30005L); // Prune should be called.
         verify(runtime, times(1))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1466,7 +1466,7 @@ class ShareCoordinatorServiceTest {
                 any(),
                 any());
 
-        timer.advanceClock(30005L); // prune should be called
+        timer.advanceClock(30005L); // Prune should be called.
         verify(runtime, times(2))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1871,6 +1871,113 @@ class ShareCoordinatorServiceTest {
         service.shutdown();
     }
 
+    @Test
+    public void testColdPartitionSnapshotTaskPeriodicityWithAllSuccess() 
throws Exception {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+
+        Metrics metrics = new Metrics();
+
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(metrics),
+            time,
+            timer,
+            writer
+        ));
+
+        when(runtime.scheduleWriteAllOperation(
+            eq("snapshot-cold-partitions"),
+            any(),
+            any()
+        )).thenReturn(List.of(CompletableFuture.completedFuture(null)));
+
+        service.startup(() -> 1);
+        verify(runtime, times(0))
+            .scheduleWriteOperation(
+                eq("snapshot-cold-partitions"),
+                any(),
+                any(),
+                any());
+
+        timer.advanceClock(10005L); // Snapshotting should be called.
+        verify(runtime, times(1))
+            .scheduleWriteAllOperation(
+                eq("snapshot-cold-partitions"),
+                any(),
+                any());
+
+        timer.advanceClock(10005L); // Snapshotting should be called.
+        verify(runtime, times(2))
+            .scheduleWriteAllOperation(
+                eq("snapshot-cold-partitions"),
+                any(),
+                any());
+
+        checkMetrics(metrics);
+
+        service.shutdown();
+    }
+
+    @Test
+    public void testColdPartitionSnapshotTaskPeriodicityWithSomeFailures() 
throws Exception {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+
+        when(runtime.scheduleWriteAllOperation(
+            eq("snapshot-cold-partitions"),
+            any(),
+            any()
+        )).thenReturn(
+            List.of(CompletableFuture.completedFuture(null), 
CompletableFuture.failedFuture(new Exception("bad stuff")))
+        ).thenReturn(
+            List.of(CompletableFuture.completedFuture(null), 
CompletableFuture.completedFuture(null))
+        );
+
+        Metrics metrics = new Metrics();
+
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(metrics),
+            time,
+            timer,
+            writer
+        ));
+
+        service.startup(() -> 2);
+        verify(runtime, times(0))
+            .scheduleWriteAllOperation(
+                eq("snapshot-cold-partitions"),
+                any(),
+                any());
+
+        timer.advanceClock(10005L); // Snapshotting should be called.
+        verify(runtime, times(1))   // For 2 topic partitions.
+            .scheduleWriteAllOperation(
+                eq("snapshot-cold-partitions"),
+                any(),
+                any());
+
+        timer.advanceClock(10005L); // Snapshotting should be called (despite 
previous partial failure).
+        verify(runtime, times(2))   // For 2 topic partitions.
+            .scheduleWriteAllOperation(
+                eq("snapshot-cold-partitions"),
+                any(),
+                any());
+
+        checkMetrics(metrics);
+
+        service.shutdown();
+    }
+
     @Test
     public void testShareStateTopicConfigs() {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 20be20832d8..e4b95d1e439 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -44,6 +44,7 @@ import 
org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
 import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
 import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
 import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
 import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
 import org.apache.kafka.image.MetadataImage;
@@ -1357,6 +1358,351 @@ class ShareCoordinatorShardTest {
         verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
     }
 
+    @Test
+    public void testSnapshotColdPartitionsNoEligiblePartitions() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+        int offset = 0;
+        int producerId = 0;
+        short producerEpoch = 0;
+        int leaderEpoch = 0;
+
+        long timestamp = TIME.milliseconds();
+
+        CoordinatorRecord record1 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(0),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        CoordinatorRecord record2 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(1),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        shard.replay(offset, producerId, producerEpoch, record1);
+        shard.replay(offset + 1, producerId, producerEpoch, record2);
+
+        
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
 TOPIC_ID, 0)));
+        
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
 TOPIC_ID, 1)));
+
+        TIME.sleep(5000);   // Less than config.
+
+        assertEquals(0, shard.snapshotColdPartitions().records().size());
+    }
+
+    @Test
+    public void testSnapshotColdPartitionsSnapshotUpdateNotConsidered() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+        int offset = 0;
+        int producerId = 0;
+        short producerEpoch = 0;
+        int leaderEpoch = 0;
+
+        long timestamp = TIME.milliseconds();
+
+        CoordinatorRecord record1 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(0),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        SharePartitionKey key = SharePartitionKey.getInstance(GROUP_ID, 
TOPIC_ID, 0);
+
+        shard.replay(offset, producerId, producerEpoch, record1);
+        assertNotNull(shard.getShareStateMapValue(key));
+
+        long sleep = 12000;
+        TIME.sleep(sleep);
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            CoordinatorRecord.record(
+                new ShareSnapshotKey()
+                    .setGroupId(GROUP_ID)
+                    .setTopicId(TOPIC_ID)
+                    .setPartition(0),
+                new ApiMessageAndVersion(
+                    new ShareSnapshotValue()
+                        .setSnapshotEpoch(1)
+                        .setStateEpoch(0)
+                        .setLeaderEpoch(leaderEpoch)
+                        .setCreateTimestamp(timestamp)
+                        .setWriteTimestamp(timestamp + sleep)
+                        .setStateBatches(List.of(
+                            new ShareSnapshotValue.StateBatch()
+                                .setFirstOffset(0)
+                                .setLastOffset(10)
+                                .setDeliveryCount((short) 1)
+                                .setDeliveryState((byte) 0))),
+                    (short) 0
+                )
+            )
+        );
+
+        assertEquals(expectedRecords, 
shard.snapshotColdPartitions().records());
+
+        shard.replay(offset + 1, producerId, producerEpoch, 
expectedRecords.get(0));
+        assertNotNull(shard.getShareStateMapValue(key));
+
+        CoordinatorRecord record2 = CoordinatorRecord.record(
+            new ShareUpdateKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(0),
+            new ApiMessageAndVersion(
+                new ShareUpdateValue()
+                    .setSnapshotEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setStateBatches(List.of(
+                        new ShareUpdateValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        shard.replay(offset + 2, producerId, producerEpoch, record2);
+
+        TIME.sleep(sleep);
+
+        assertNotNull(shard.getShareStateMapValue(key));
+        assertEquals(timestamp + sleep, 
shard.getShareStateMapValue(key).writeTimestamp()); // No snapshot since update 
has no time info.
+    }
+
+    @Test
+    public void testSnapshotColdPartitionsDoesNotPerpetuallySnapshot() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+        int offset = 0;
+        int producerId = 0;
+        short producerEpoch = 0;
+        int leaderEpoch = 0;
+
+        long timestamp = TIME.milliseconds();
+
+        CoordinatorRecord record1 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(0),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        shard.replay(offset, producerId, producerEpoch, record1);
+        
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
 TOPIC_ID, 0)));
+
+        long sleep = 12000;
+        TIME.sleep(sleep);
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            CoordinatorRecord.record(
+                new ShareSnapshotKey()
+                    .setGroupId(GROUP_ID)
+                    .setTopicId(TOPIC_ID)
+                    .setPartition(0),
+                new ApiMessageAndVersion(
+                    new ShareSnapshotValue()
+                        .setSnapshotEpoch(1)
+                        .setStateEpoch(0)
+                        .setLeaderEpoch(leaderEpoch)
+                        .setCreateTimestamp(timestamp)
+                        .setWriteTimestamp(timestamp + sleep)
+                        .setStateBatches(List.of(
+                            new ShareSnapshotValue.StateBatch()
+                                .setFirstOffset(0)
+                                .setLastOffset(10)
+                                .setDeliveryCount((short) 1)
+                                .setDeliveryState((byte) 0))),
+                    (short) 0
+                )
+            )
+        );
+
+        assertEquals(expectedRecords, 
shard.snapshotColdPartitions().records());
+
+        shard.replay(offset + 1, producerId, producerEpoch, 
expectedRecords.get(0));
+        
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
 TOPIC_ID, 0)));
+
+        // Since all existing snapshots are already snapshotted, no new 
records will be created.
+        TIME.sleep(12000);
+
+        assertEquals(0, shard.snapshotColdPartitions().records().size());
+    }
+
+    @Test
+    public void testSnapshotColdPartitionsPartialEligiblePartitions() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+        int offset = 0;
+        int producerId = 0;
+        short producerEpoch = 0;
+        int leaderEpoch = 0;
+        SharePartitionKey key0 = SharePartitionKey.getInstance(GROUP_ID, 
TOPIC_ID, 0);
+        SharePartitionKey key1 = SharePartitionKey.getInstance(GROUP_ID, 
TOPIC_ID, 1);
+
+        long timestamp = TIME.milliseconds();
+        int record1SnapshotEpoch = 0;
+
+        CoordinatorRecord record1 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(0),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(record1SnapshotEpoch)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        long delta = 15000; // 15 seconds
+
+        CoordinatorRecord record2 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(1),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp + delta)
+                    .setWriteTimestamp(timestamp + delta)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        shard.replay(offset, producerId, producerEpoch, record1);
+        shard.replay(offset + 1, producerId, producerEpoch, record2);
+
+        assertNotNull(shard.getShareStateMapValue(key0));
+        assertNotNull(shard.getShareStateMapValue(key1));
+        assertEquals(timestamp, 
shard.getShareStateMapValue(key0).writeTimestamp());
+        assertEquals(timestamp + delta, 
shard.getShareStateMapValue(key1).writeTimestamp());
+
+        long sleep = 12000;
+        TIME.sleep(sleep);  // Record 1 is eligible now.
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            CoordinatorRecord.record(
+                new ShareSnapshotKey()
+                    .setGroupId(GROUP_ID)
+                    .setTopicId(TOPIC_ID)
+                    .setPartition(0),
+                new ApiMessageAndVersion(
+                    new ShareSnapshotValue()
+                        .setSnapshotEpoch(record1SnapshotEpoch + 1)
+                        .setStateEpoch(0)
+                        .setLeaderEpoch(leaderEpoch)
+                        .setCreateTimestamp(timestamp)
+                        .setWriteTimestamp(timestamp + sleep)
+                        .setStateBatches(List.of(
+                            new ShareSnapshotValue.StateBatch()
+                                .setFirstOffset(0)
+                                .setLastOffset(10)
+                                .setDeliveryCount((short) 1)
+                                .setDeliveryState((byte) 0))),
+                    (short) 0
+                )
+            )
+        );
+
+        List<CoordinatorRecord> records = 
shard.snapshotColdPartitions().records();
+        assertEquals(expectedRecords, records);
+
+        shard.replay(offset + 2, producerId, producerEpoch, records.get(0));
+
+        assertEquals(timestamp + delta, 
shard.getShareStateMapValue(key1).writeTimestamp());
+        assertEquals(timestamp + sleep, 
shard.getShareStateMapValue(key0).writeTimestamp());
+    }
+
     private static ShareGroupOffset groupOffset(ApiMessage record) {
         if (record instanceof ShareSnapshotValue) {
             return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
index eab6f2966ac..853bc119432 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
@@ -50,6 +50,7 @@ public class ShareCoordinatorTestConfig {
         configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10");
         
configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG, 
String.valueOf(CompressionType.NONE.id));
         
configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, 
"30000");  // 30 seconds
+        
configs.put(ShareCoordinatorConfig.COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, 
"10000");    // 10 seconds
         return configs;
     }
 


Reply via email to