This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6fe1598e6b5 KAFKA-18170: Add scheduled job to snapshot cold share
partitions. (#19443)
6fe1598e6b5 is described below
commit 6fe1598e6b583938defbdc35ee3f63d108b5febc
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Apr 23 16:22:28 2025 +0530
KAFKA-18170: Add scheduled job to snapshot cold share partitions. (#19443)
* There could be scenarios where share partition records in
`__share_group_state` internal topic are not updated for a while
implying these partitions are basically cold.
* In this situation, the presence of these holds back the
pruner from keeping the topic clean and of manageable size.
* To remedy the situation, we have added a periodic
`setupSnapshotColdPartitions` in `ShareCoordinatorService` which does a
writeAll operation on the associated shards in the coordinator and
forces snapshot creation for any cold partitions. In this way the pruner
can continue.
This job has been added as a timer task.
* A new internal config
`share.coordinator.cold.partition.snapshot.interval.ms` has been
introduced to set the period of the job.
* Any failures are logged and ignored.
* New tests have been added to verify the feature.
Reviewers: PoAn Yang <[email protected]>, Andrew Schofield
<[email protected]>
---
.../coordinator/share/ShareCoordinatorConfig.java | 14 +-
.../coordinator/share/ShareCoordinatorService.java | 29 +-
.../coordinator/share/ShareCoordinatorShard.java | 52 +++-
.../kafka/coordinator/share/ShareGroupOffset.java | 22 +-
.../share/ShareCoordinatorServiceTest.java | 111 ++++++-
.../share/ShareCoordinatorShardTest.java | 346 +++++++++++++++++++++
.../share/ShareCoordinatorTestConfig.java | 1 +
7 files changed, 556 insertions(+), 19 deletions(-)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
index f4a11abea8e..b92947af9cd 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
@@ -76,6 +76,10 @@ public class ShareCoordinatorConfig {
public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 *
1000; // 5 minutes
public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The
duration in milliseconds that the share coordinator will wait between pruning
eligible records in share-group state topic.";
+ public static final String COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG =
"share.coordinator.cold.partition.snapshot.interval.ms";
+ public static final int COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT = 5 *
60 * 1000; // 5 minutes
+ public static final String COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC = "The
duration in milliseconds that the share coordinator will wait between force
snapshotting share partitions which are not being updated.";
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT,
STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH,
STATE_TOPIC_NUM_PARTITIONS_DOC)
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT,
STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH,
STATE_TOPIC_REPLICATION_FACTOR_DOC)
@@ -87,7 +91,8 @@ public class ShareCoordinatorConfig {
.define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int)
STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH,
STATE_TOPIC_COMPRESSION_CODEC_DOC)
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT,
atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT,
atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
- .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT,
STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
STATE_TOPIC_PRUNE_INTERVAL_MS_DOC);
+ .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT,
STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
STATE_TOPIC_PRUNE_INTERVAL_MS_DOC)
+ .defineInternal(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, INT,
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC);
private final int stateTopicNumPartitions;
private final short stateTopicReplicationFactor;
@@ -100,7 +105,7 @@ public class ShareCoordinatorConfig {
private final CompressionType compressionType;
private final int appendLingerMs;
private final int pruneIntervalMs;
-
+ private final int coldPartitionSnapshotIntervalMs;
public ShareCoordinatorConfig(AbstractConfig config) {
stateTopicNumPartitions =
config.getInt(STATE_TOPIC_NUM_PARTITIONS_CONFIG);
@@ -116,6 +121,7 @@ public class ShareCoordinatorConfig {
.orElse(null);
appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
+ coldPartitionSnapshotIntervalMs =
config.getInt(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG);
validate();
}
@@ -163,6 +169,10 @@ public class ShareCoordinatorConfig {
return pruneIntervalMs;
}
+ public int shareCoordinatorColdPartitionSnapshotIntervalMs() {
+ return coldPartitionSnapshotIntervalMs;
+ }
+
private void validate() {
Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 &&
snapshotUpdateRecordsPerSnapshot <= 500,
String.format("%s must be between [0, 500]",
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 8b87aed65ef..321a1e92ac0 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -261,10 +261,15 @@ public class ShareCoordinatorService implements
ShareCoordinator {
log.info("Starting up.");
numPartitions = shareGroupTopicPartitionCount.getAsInt();
- setupRecordPruning();
+ setupPeriodicJobs();
log.info("Startup complete.");
}
+ private void setupPeriodicJobs() {
+ setupRecordPruning();
+ setupSnapshotColdPartitions();
+ }
+
private void setupRecordPruning() {
log.info("Scheduling share-group state topic prune job.");
timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs())
{
@@ -341,6 +346,28 @@ public class ShareCoordinatorService implements
ShareCoordinator {
return fut;
}
+ private void setupSnapshotColdPartitions() {
+ log.info("Scheduling cold share-partition snapshotting.");
+ timer.add(new
TimerTask(config.shareCoordinatorColdPartitionSnapshotIntervalMs()) {
+ @Override
+ public void run() {
+ List<CompletableFuture<Void>> futures =
runtime.scheduleWriteAllOperation(
+ "snapshot-cold-partitions",
+ Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+ ShareCoordinatorShard::snapshotColdPartitions
+ );
+
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture<?>[]{}))
+ .whenComplete((__, exp) -> {
+ if (exp != null) {
+ log.error("Received error while snapshotting cold
partitions.", exp);
+ }
+ setupSnapshotColdPartitions();
+ });
+ }
+ });
+ }
+
@Override
public void shutdown() {
if (!isActive.compareAndSet(true, false)) {
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 35891bddf8d..d38564fd6f8 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -66,6 +66,7 @@ import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -574,6 +575,46 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
return new CoordinatorResult<>(List.of(record), responseData);
}
+ /**
+ * Iterates over the soft state to determine the share partitions whose
last snapshot is
+ * older than the allowed time interval. The candidate share partitions
are force snapshotted.
+ *
+ * @return A result containing snapshot records, if any, and a void
response.
+ */
+ public CoordinatorResult<Void, CoordinatorRecord> snapshotColdPartitions()
{
+ long coldSnapshottedPartitionsCount = shareStateMap.values().stream()
+ .filter(shareGroupOffset -> shareGroupOffset.createTimestamp() -
shareGroupOffset.writeTimestamp() != 0)
+ .count();
+
+ // If all share partitions are snapshotted, it means that
+ // system is quiet and cold snapshotting will not help much.
+ if (coldSnapshottedPartitionsCount == shareStateMap.size()) {
+ log.debug("All share snapshot records already cold snapshotted,
skipping.");
+ return new CoordinatorResult<>(List.of(), null);
+ }
+
+ // Some active partitions are there.
+ List<CoordinatorRecord> records = new ArrayList<>();
+
+ shareStateMap.forEach((sharePartitionKey, shareGroupOffset) -> {
+ long timeSinceLastSnapshot = time.milliseconds() -
shareGroupOffset.writeTimestamp();
+ if (timeSinceLastSnapshot >=
config.shareCoordinatorColdPartitionSnapshotIntervalMs()) {
+ // We need to force create a snapshot here
+ log.info("Last snapshot for {} is older than allowed
interval.", sharePartitionKey);
+
records.add(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ sharePartitionKey.groupId(),
+ sharePartitionKey.topicId(),
+ sharePartitionKey.partition(),
+ shareGroupOffset.builderSupplier()
+ .setSnapshotEpoch(shareGroupOffset.snapshotEpoch() +
1) // We need to increment by one as this is a new snapshot.
+ .setWriteTimestamp(time.milliseconds())
+ .build()
+ ));
+ }
+ });
+ return new CoordinatorResult<>(records, null);
+ }
+
/**
* Util method to generate a ShareSnapshot or ShareUpdate type record for
a key, based on various conditions.
* <p>
@@ -589,6 +630,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
WriteShareGroupStateRequestData.PartitionData partitionData,
SharePartitionKey key
) {
+ long timestamp = time.milliseconds();
if (!shareStateMap.containsKey(key)) {
// Since this is the first time we are getting a write request for
key, we should be creating a share snapshot record.
// The incoming partition data could have overlapping state
batches, we must merge them
@@ -600,8 +642,8 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
.setLeaderEpoch(partitionData.leaderEpoch())
.setStateEpoch(partitionData.stateEpoch())
.setStateBatches(mergeBatches(List.of(), partitionData))
- .setCreateTimestamp(time.milliseconds())
- .setWriteTimestamp(time.milliseconds())
+ .setCreateTimestamp(timestamp)
+ .setWriteTimestamp(timestamp)
.build());
} else if (snapshotUpdateCount.getOrDefault(key, 0) >=
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
ShareGroupOffset currentState = shareStateMap.get(key); //
shareStateMap will have the entry as containsKey is true
@@ -620,8 +662,8 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
.setLeaderEpoch(newLeaderEpoch)
.setStateEpoch(newStateEpoch)
.setStateBatches(mergeBatches(currentState.stateBatches(),
partitionData, newStartOffset))
- .setCreateTimestamp(time.milliseconds())
- .setWriteTimestamp(time.milliseconds())
+ .setCreateTimestamp(timestamp)
+ .setWriteTimestamp(timestamp)
.build());
} else {
ShareGroupOffset currentState = shareStateMap.get(key); //
shareStateMap will have the entry as containsKey is true.
@@ -636,8 +678,6 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
.setStateBatches(mergeBatches(List.of(), partitionData))
- .setCreateTimestamp(currentState.createTimestamp())
- .setWriteTimestamp(currentState.writeTimestamp())
.build());
}
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
index d2a91a5bc7a..c0397a273f4 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
@@ -24,7 +24,6 @@ import
org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.server.share.persister.PersisterStateBatch;
import java.util.Collections;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
@@ -33,8 +32,8 @@ import java.util.Objects;
* This class is immutable (state batches is not modified out of context).
*/
public class ShareGroupOffset {
- public static final int NO_TIMESTAMP = -1;
- public static final int UNINITIALIZED_EPOCH = -1;
+ public static final int NO_TIMESTAMP = 0;
+ public static final int UNINITIALIZED_EPOCH = 0;
public static final int DEFAULT_EPOCH = 0;
private final int snapshotEpoch;
@@ -161,10 +160,6 @@ public class ShareGroupOffset {
);
}
- public LinkedHashSet<PersisterStateBatch> stateBatchAsSet() {
- return new LinkedHashSet<>(stateBatches);
- }
-
public static class Builder {
private int snapshotEpoch;
private int stateEpoch;
@@ -195,7 +190,7 @@ public class ShareGroupOffset {
}
public Builder setStateBatches(List<PersisterStateBatch> stateBatches)
{
- this.stateBatches = stateBatches;
+ this.stateBatches = stateBatches == null ? Collections.emptyList()
: stateBatches.stream().toList();
return this;
}
@@ -245,4 +240,15 @@ public class ShareGroupOffset {
", stateBatches=" + stateBatches +
'}';
}
+
+ public Builder builderSupplier() {
+ return new Builder()
+ .setSnapshotEpoch(snapshotEpoch())
+ .setStateEpoch(stateEpoch())
+ .setLeaderEpoch(leaderEpoch())
+ .setStartOffset(startOffset())
+ .setStateBatches(stateBatches())
+ .setCreateTimestamp(createTimestamp())
+ .setWriteTimestamp(writeTimestamp());
+ }
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 399643e32a9..a4b316d9443 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -1458,7 +1458,7 @@ class ShareCoordinatorServiceTest {
any(),
any());
- timer.advanceClock(30005L); // prune should be called
+ timer.advanceClock(30005L); // Prune should be called.
verify(runtime, times(1))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1466,7 +1466,7 @@ class ShareCoordinatorServiceTest {
any(),
any());
- timer.advanceClock(30005L); // prune should be called
+ timer.advanceClock(30005L); // Prune should be called.
verify(runtime, times(2))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1871,6 +1871,113 @@ class ShareCoordinatorServiceTest {
service.shutdown();
}
+ @Test
+ public void testColdPartitionSnapshotTaskPeriodicityWithAllSuccess()
throws Exception {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ Metrics metrics = new Metrics();
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(metrics),
+ time,
+ timer,
+ writer
+ ));
+
+ when(runtime.scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any()
+ )).thenReturn(List.of(CompletableFuture.completedFuture(null)));
+
+ service.startup(() -> 1);
+ verify(runtime, times(0))
+ .scheduleWriteOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any(),
+ any());
+
+ timer.advanceClock(10005L); // Snapshotting should be called.
+ verify(runtime, times(1))
+ .scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any());
+
+ timer.advanceClock(10005L); // Snapshotting should be called.
+ verify(runtime, times(2))
+ .scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any());
+
+ checkMetrics(metrics);
+
+ service.shutdown();
+ }
+
+ @Test
+ public void testColdPartitionSnapshotTaskPeriodicityWithSomeFailures()
throws Exception {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ MockTime time = new MockTime();
+ MockTimer timer = new MockTimer(time);
+ PartitionWriter writer = mock(PartitionWriter.class);
+
+ when(runtime.scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any()
+ )).thenReturn(
+ List.of(CompletableFuture.completedFuture(null),
CompletableFuture.failedFuture(new Exception("bad stuff")))
+ ).thenReturn(
+ List.of(CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null))
+ );
+
+ Metrics metrics = new Metrics();
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(metrics),
+ time,
+ timer,
+ writer
+ ));
+
+ service.startup(() -> 2);
+ verify(runtime, times(0))
+ .scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any());
+
+ timer.advanceClock(10005L); // Snapshotting should be called.
+ verify(runtime, times(1)) // For 2 topic partitions.
+ .scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any());
+
+ timer.advanceClock(10005L); // Snapshotting should be called (despite
previous partial failure).
+ verify(runtime, times(2)) // For 2 topic partitions.
+ .scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any());
+
+ checkMetrics(metrics);
+
+ service.shutdown();
+ }
+
@Test
public void testShareStateTopicConfigs() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 20be20832d8..e4b95d1e439 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -44,6 +44,7 @@ import
org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.image.MetadataImage;
@@ -1357,6 +1358,351 @@ class ShareCoordinatorShardTest {
verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
}
+ @Test
+ public void testSnapshotColdPartitionsNoEligiblePartitions() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ MetadataImage image = mock(MetadataImage.class);
+ shard.onNewMetadataImage(image, null);
+ int offset = 0;
+ int producerId = 0;
+ short producerEpoch = 0;
+ int leaderEpoch = 0;
+
+ long timestamp = TIME.milliseconds();
+
+ CoordinatorRecord record1 = CoordinatorRecord.record(
+ new ShareSnapshotKey()
+ .setGroupId(GROUP_ID)
+ .setTopicId(TOPIC_ID)
+ .setPartition(0),
+ new ApiMessageAndVersion(
+ new ShareSnapshotValue()
+ .setSnapshotEpoch(0)
+ .setStateEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
+ .setCreateTimestamp(timestamp)
+ .setWriteTimestamp(timestamp)
+ .setStateBatches(List.of(
+ new ShareSnapshotValue.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0))),
+ (short) 0
+ )
+ );
+
+ CoordinatorRecord record2 = CoordinatorRecord.record(
+ new ShareSnapshotKey()
+ .setGroupId(GROUP_ID)
+ .setTopicId(TOPIC_ID)
+ .setPartition(1),
+ new ApiMessageAndVersion(
+ new ShareSnapshotValue()
+ .setSnapshotEpoch(0)
+ .setStateEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
+ .setCreateTimestamp(timestamp)
+ .setWriteTimestamp(timestamp)
+ .setStateBatches(List.of(
+ new ShareSnapshotValue.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0))),
+ (short) 0
+ )
+ );
+
+ shard.replay(offset, producerId, producerEpoch, record1);
+ shard.replay(offset + 1, producerId, producerEpoch, record2);
+
+
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
TOPIC_ID, 0)));
+
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
TOPIC_ID, 1)));
+
+ TIME.sleep(5000); // Less than config.
+
+ assertEquals(0, shard.snapshotColdPartitions().records().size());
+ }
+
+ @Test
+ public void testSnapshotColdPartitionsSnapshotUpdateNotConsidered() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ MetadataImage image = mock(MetadataImage.class);
+ shard.onNewMetadataImage(image, null);
+ int offset = 0;
+ int producerId = 0;
+ short producerEpoch = 0;
+ int leaderEpoch = 0;
+
+ long timestamp = TIME.milliseconds();
+
+ CoordinatorRecord record1 = CoordinatorRecord.record(
+ new ShareSnapshotKey()
+ .setGroupId(GROUP_ID)
+ .setTopicId(TOPIC_ID)
+ .setPartition(0),
+ new ApiMessageAndVersion(
+ new ShareSnapshotValue()
+ .setSnapshotEpoch(0)
+ .setStateEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
+ .setCreateTimestamp(timestamp)
+ .setWriteTimestamp(timestamp)
+ .setStateBatches(List.of(
+ new ShareSnapshotValue.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0))),
+ (short) 0
+ )
+ );
+
+ SharePartitionKey key = SharePartitionKey.getInstance(GROUP_ID,
TOPIC_ID, 0);
+
+ shard.replay(offset, producerId, producerEpoch, record1);
+ assertNotNull(shard.getShareStateMapValue(key));
+
+ long sleep = 12000;
+ TIME.sleep(sleep);
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+ CoordinatorRecord.record(
+ new ShareSnapshotKey()
+ .setGroupId(GROUP_ID)
+ .setTopicId(TOPIC_ID)
+ .setPartition(0),
+ new ApiMessageAndVersion(
+ new ShareSnapshotValue()
+ .setSnapshotEpoch(1)
+ .setStateEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
+ .setCreateTimestamp(timestamp)
+ .setWriteTimestamp(timestamp + sleep)
+ .setStateBatches(List.of(
+ new ShareSnapshotValue.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0))),
+ (short) 0
+ )
+ )
+ );
+
+ assertEquals(expectedRecords,
shard.snapshotColdPartitions().records());
+
+ shard.replay(offset + 1, producerId, producerEpoch,
expectedRecords.get(0));
+ assertNotNull(shard.getShareStateMapValue(key));
+
+ CoordinatorRecord record2 = CoordinatorRecord.record(
+ new ShareUpdateKey()
+ .setGroupId(GROUP_ID)
+ .setTopicId(TOPIC_ID)
+ .setPartition(0),
+ new ApiMessageAndVersion(
+ new ShareUpdateValue()
+ .setSnapshotEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
+ .setStateBatches(List.of(
+ new ShareUpdateValue.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0))),
+ (short) 0
+ )
+ );
+
+ shard.replay(offset + 2, producerId, producerEpoch, record2);
+
+ TIME.sleep(sleep);
+
+ assertNotNull(shard.getShareStateMapValue(key));
+ assertEquals(timestamp + sleep,
shard.getShareStateMapValue(key).writeTimestamp()); // No snapshot since update
has no time info.
+ }
+
+ @Test
+ public void testSnapshotColdPartitionsDoesNotPerpetuallySnapshot() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ MetadataImage image = mock(MetadataImage.class);
+ shard.onNewMetadataImage(image, null);
+ int offset = 0;
+ int producerId = 0;
+ short producerEpoch = 0;
+ int leaderEpoch = 0;
+
+ long timestamp = TIME.milliseconds();
+
+ CoordinatorRecord record1 = CoordinatorRecord.record(
+ new ShareSnapshotKey()
+ .setGroupId(GROUP_ID)
+ .setTopicId(TOPIC_ID)
+ .setPartition(0),
+ new ApiMessageAndVersion(
+ new ShareSnapshotValue()
+ .setSnapshotEpoch(0)
+ .setStateEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
+ .setCreateTimestamp(timestamp)
+ .setWriteTimestamp(timestamp)
+ .setStateBatches(List.of(
+ new ShareSnapshotValue.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0))),
+ (short) 0
+ )
+ );
+
+ shard.replay(offset, producerId, producerEpoch, record1);
+
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
TOPIC_ID, 0)));
+
+ long sleep = 12000;
+ TIME.sleep(sleep);
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+ CoordinatorRecord.record(
+ new ShareSnapshotKey()
+ .setGroupId(GROUP_ID)
+ .setTopicId(TOPIC_ID)
+ .setPartition(0),
+ new ApiMessageAndVersion(
+ new ShareSnapshotValue()
+ .setSnapshotEpoch(1)
+ .setStateEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
+ .setCreateTimestamp(timestamp)
+ .setWriteTimestamp(timestamp + sleep)
+ .setStateBatches(List.of(
+ new ShareSnapshotValue.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0))),
+ (short) 0
+ )
+ )
+ );
+
+ assertEquals(expectedRecords,
shard.snapshotColdPartitions().records());
+
+ shard.replay(offset + 1, producerId, producerEpoch,
expectedRecords.get(0));
+
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
TOPIC_ID, 0)));
+
+ // Since all existing snapshots are already snapshotted, no new
records will be created.
+ TIME.sleep(12000);
+
+ assertEquals(0, shard.snapshotColdPartitions().records().size());
+ }
+
+ @Test
+ public void testSnapshotColdPartitionsPartialEligiblePartitions() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ MetadataImage image = mock(MetadataImage.class);
+ shard.onNewMetadataImage(image, null);
+ int offset = 0;
+ int producerId = 0;
+ short producerEpoch = 0;
+ int leaderEpoch = 0;
+ SharePartitionKey key0 = SharePartitionKey.getInstance(GROUP_ID,
TOPIC_ID, 0);
+ SharePartitionKey key1 = SharePartitionKey.getInstance(GROUP_ID,
TOPIC_ID, 1);
+
+ long timestamp = TIME.milliseconds();
+ int record1SnapshotEpoch = 0;
+
+ CoordinatorRecord record1 = CoordinatorRecord.record(
+ new ShareSnapshotKey()
+ .setGroupId(GROUP_ID)
+ .setTopicId(TOPIC_ID)
+ .setPartition(0),
+ new ApiMessageAndVersion(
+ new ShareSnapshotValue()
+ .setSnapshotEpoch(record1SnapshotEpoch)
+ .setStateEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
+ .setCreateTimestamp(timestamp)
+ .setWriteTimestamp(timestamp)
+ .setStateBatches(List.of(
+ new ShareSnapshotValue.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0))),
+ (short) 0
+ )
+ );
+
+ long delta = 15000; // 15 seconds
+
+ CoordinatorRecord record2 = CoordinatorRecord.record(
+ new ShareSnapshotKey()
+ .setGroupId(GROUP_ID)
+ .setTopicId(TOPIC_ID)
+ .setPartition(1),
+ new ApiMessageAndVersion(
+ new ShareSnapshotValue()
+ .setSnapshotEpoch(0)
+ .setStateEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
+ .setCreateTimestamp(timestamp + delta)
+ .setWriteTimestamp(timestamp + delta)
+ .setStateBatches(List.of(
+ new ShareSnapshotValue.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0))),
+ (short) 0
+ )
+ );
+
+ shard.replay(offset, producerId, producerEpoch, record1);
+ shard.replay(offset + 1, producerId, producerEpoch, record2);
+
+ assertNotNull(shard.getShareStateMapValue(key0));
+ assertNotNull(shard.getShareStateMapValue(key1));
+ assertEquals(timestamp,
shard.getShareStateMapValue(key0).writeTimestamp());
+ assertEquals(timestamp + delta,
shard.getShareStateMapValue(key1).writeTimestamp());
+
+ long sleep = 12000;
+ TIME.sleep(sleep); // Record 1 is eligible now.
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+ CoordinatorRecord.record(
+ new ShareSnapshotKey()
+ .setGroupId(GROUP_ID)
+ .setTopicId(TOPIC_ID)
+ .setPartition(0),
+ new ApiMessageAndVersion(
+ new ShareSnapshotValue()
+ .setSnapshotEpoch(record1SnapshotEpoch + 1)
+ .setStateEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
+ .setCreateTimestamp(timestamp)
+ .setWriteTimestamp(timestamp + sleep)
+ .setStateBatches(List.of(
+ new ShareSnapshotValue.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0))),
+ (short) 0
+ )
+ )
+ );
+
+ List<CoordinatorRecord> records =
shard.snapshotColdPartitions().records();
+ assertEquals(expectedRecords, records);
+
+ shard.replay(offset + 2, producerId, producerEpoch, records.get(0));
+
+ assertEquals(timestamp + delta,
shard.getShareStateMapValue(key1).writeTimestamp());
+ assertEquals(timestamp + sleep,
shard.getShareStateMapValue(key0).writeTimestamp());
+ }
+
private static ShareGroupOffset groupOffset(ApiMessage record) {
if (record instanceof ShareSnapshotValue) {
return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
index eab6f2966ac..853bc119432 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
@@ -50,6 +50,7 @@ public class ShareCoordinatorTestConfig {
configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10");
configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG,
String.valueOf(CompressionType.NONE.id));
configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG,
"30000"); // 30 seconds
+
configs.put(ShareCoordinatorConfig.COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG,
"10000"); // 10 seconds
return configs;
}