This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 847968e530c KAFKA-19281: Add share enable flag to periodic jobs.
(#19721)
847968e530c is described below
commit 847968e530c1876fca89c2a6827d6b90e1755904
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu May 15 18:35:06 2025 +0530
KAFKA-19281: Add share enable flag to periodic jobs. (#19721)
* We have a few periodic timer tasks in `ShareCoordinatorService` which
run continuously.
* With the recent introduction of share group enabled config at feature
level, we would like these jobs to stop when the aforementioned feature
is disabled.
* In this PR, we have added the functionality to make that possible.
* Additionally the service has been supplemented with addition of a
static share group config supplier.
* New test has been added.
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 1 +
.../coordinator/share/ShareCoordinatorService.java | 57 +++++-
.../share/ShareCoordinatorServiceTest.java | 228 +++++++++++++++++----
3 files changed, 245 insertions(+), 41 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index f8b52a728c6..3a7fd798801 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -660,6 +660,7 @@ class BrokerServer(
.withWriter(writer)
.withCoordinatorRuntimeMetrics(new
ShareCoordinatorRuntimeMetrics(metrics))
.withCoordinatorMetrics(new ShareCoordinatorMetrics(metrics))
+ .withShareGroupEnabledConfigSupplier(() =>
config.shareGroupConfig.isShareGroupEnabled)
.build()
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 3c47bf86fa0..e26aac124ff 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -54,6 +54,7 @@ import
org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.FutureUtils;
@@ -76,6 +77,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
+import java.util.function.Supplier;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
@@ -91,6 +93,8 @@ public class ShareCoordinatorService implements
ShareCoordinator {
private final Timer timer;
private final PartitionWriter writer;
private final Map<TopicPartition, Long> lastPrunedOffsets;
+ private final Supplier<Boolean> shareGroupConfigEnabledSupplier;
+ private volatile boolean shouldRunPeriodicJob;
public static class Builder {
private final int nodeId;
@@ -99,7 +103,7 @@ public class ShareCoordinatorService implements
ShareCoordinator {
private CoordinatorLoader<CoordinatorRecord> loader;
private Time time;
private Timer timer;
-
+ private Supplier<Boolean> shareGroupConfigEnabledSupplier;
private ShareCoordinatorMetrics coordinatorMetrics;
private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
@@ -138,6 +142,11 @@ public class ShareCoordinatorService implements
ShareCoordinator {
return this;
}
+ public Builder withShareGroupEnabledConfigSupplier(Supplier<Boolean>
shareGroupConfigEnabledSupplier) {
+ this.shareGroupConfigEnabledSupplier =
shareGroupConfigEnabledSupplier;
+ return this;
+ }
+
public ShareCoordinatorService build() {
if (config == null) {
throw new IllegalArgumentException("Config must be set.");
@@ -160,6 +169,9 @@ public class ShareCoordinatorService implements
ShareCoordinator {
if (coordinatorRuntimeMetrics == null) {
throw new IllegalArgumentException("Coordinator runtime
metrics must be set.");
}
+ if (shareGroupConfigEnabledSupplier == null) {
+ throw new IllegalArgumentException("Share group enabled config
enabled supplier must be set.");
+ }
String logPrefix = String.format("ShareCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ",
logPrefix));
@@ -202,7 +214,8 @@ public class ShareCoordinatorService implements
ShareCoordinator {
coordinatorMetrics,
time,
timer,
- writer
+ writer,
+ shareGroupConfigEnabledSupplier
);
}
}
@@ -214,7 +227,8 @@ public class ShareCoordinatorService implements
ShareCoordinator {
ShareCoordinatorMetrics shareCoordinatorMetrics,
Time time,
Timer timer,
- PartitionWriter writer
+ PartitionWriter writer,
+ Supplier<Boolean> shareGroupConfigEnabledSupplier
) {
this.log = logContext.logger(ShareCoordinatorService.class);
this.config = config;
@@ -224,6 +238,7 @@ public class ShareCoordinatorService implements
ShareCoordinator {
this.timer = timer;
this.writer = writer;
this.lastPrunedOffsets = new ConcurrentHashMap<>();
+ this.shareGroupConfigEnabledSupplier = shareGroupConfigEnabledSupplier;
}
@Override
@@ -265,7 +280,6 @@ public class ShareCoordinatorService implements
ShareCoordinator {
log.info("Starting up.");
numPartitions = shareGroupTopicPartitionCount.getAsInt();
- setupPeriodicJobs();
log.info("Startup complete.");
}
@@ -274,11 +288,15 @@ public class ShareCoordinatorService implements
ShareCoordinator {
setupSnapshotColdPartitions();
}
- private void setupRecordPruning() {
+ // Visibility for tests
+ void setupRecordPruning() {
log.debug("Scheduling share-group state topic prune job.");
timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs())
{
@Override
public void run() {
+ if (!shouldRunPeriodicJob) {
+ return;
+ }
List<CompletableFuture<Void>> futures = new ArrayList<>();
runtime.activeTopicPartitions().forEach(tp ->
futures.add(performRecordPruning(tp)));
@@ -349,11 +367,15 @@ public class ShareCoordinatorService implements
ShareCoordinator {
return fut;
}
- private void setupSnapshotColdPartitions() {
+ // Visibility for tests
+ void setupSnapshotColdPartitions() {
log.debug("Scheduling cold share-partition snapshotting.");
timer.add(new
TimerTask(config.shareCoordinatorColdPartitionSnapshotIntervalMs()) {
@Override
public void run() {
+ if (!shouldRunPeriodicJob) {
+ return;
+ }
List<CompletableFuture<Void>> futures =
runtime.scheduleWriteAllOperation(
"snapshot-cold-partitions",
Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
@@ -1075,6 +1097,18 @@ public class ShareCoordinatorService implements
ShareCoordinator {
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta
delta) {
throwIfNotActive();
this.runtime.onNewMetadataImage(newImage, delta);
+ boolean enabled = isShareGroupsEnabled(newImage);
+ // enabled shouldRunJob result (XOR)
+ // 0 0 no op on flag, do not call jobs
+ // 0 1 disable flag, do not call jobs
=> action
+ // 1 0 enable flag, call jobs as they are not
recursing => action
+ // 1 1 no op on flag, do not call jobs
+ if (enabled ^ shouldRunPeriodicJob) {
+ shouldRunPeriodicJob = enabled;
+ if (enabled) {
+ setupPeriodicJobs();
+ }
+ }
}
TopicPartition topicPartitionFor(SharePartitionKey key) {
@@ -1090,4 +1124,15 @@ public class ShareCoordinatorService implements
ShareCoordinator {
throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
}
}
+
+ private boolean isShareGroupsEnabled(MetadataImage image) {
+ return shareGroupConfigEnabledSupplier.get() ||
ShareVersion.fromFeatureLevel(
+
image.features().finalizedVersions().getOrDefault(ShareVersion.FEATURE_NAME,
(short) 0)
+ ).supportsShareGroups();
+ }
+
+ // Visibility for tests
+ boolean shouldRunPeriodicJob() {
+ return shouldRunPeriodicJob;
+ }
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 952464efd60..542d9cd8d0d 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -45,6 +45,9 @@ import
org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.MockTime;
@@ -52,6 +55,7 @@ import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.time.Duration;
import java.util.HashSet;
@@ -68,17 +72,21 @@ import java.util.concurrent.TimeoutException;
import static
org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+@SuppressWarnings("ClassFanOutComplexity")
class ShareCoordinatorServiceTest {
@SuppressWarnings("unchecked")
@@ -99,7 +107,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
new MockTimer(),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -122,7 +131,8 @@ class ShareCoordinatorServiceTest {
coordinatorMetrics,
time,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -232,7 +242,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -334,7 +345,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -418,7 +430,8 @@ class ShareCoordinatorServiceTest {
coordinatorMetrics,
time,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -500,7 +513,8 @@ class ShareCoordinatorServiceTest {
coordinatorMetrics,
time,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -582,7 +596,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -629,7 +644,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -676,7 +692,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -723,7 +740,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -770,7 +788,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -817,7 +836,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
String groupId = "group1";
@@ -896,7 +916,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
String groupId = "group1";
@@ -959,7 +980,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
String groupId = "group1";
@@ -1022,7 +1044,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
String groupId = "group1";
@@ -1083,7 +1106,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
String groupId = "group1";
@@ -1143,7 +1167,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -1193,7 +1218,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -1236,7 +1262,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -1279,7 +1306,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -1321,7 +1349,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -1362,7 +1391,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
service.startup(() -> 1);
@@ -1391,7 +1421,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class)
+ mock(PartitionWriter.class),
+ () -> true
);
String groupId = "group1";
@@ -1449,10 +1480,12 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
service.startup(() -> 1);
+ service.onNewMetadataImage(mock(MetadataImage.class),
mock(MetadataDelta.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1542,10 +1575,12 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
service.startup(() -> 2);
+ service.onNewMetadataImage(mock(MetadataImage.class),
mock(MetadataDelta.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1602,10 +1637,12 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
service.startup(() -> 1);
+ service.onNewMetadataImage(mock(MetadataImage.class),
mock(MetadataDelta.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1653,10 +1690,12 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
service.startup(() -> 1);
+ service.onNewMetadataImage(mock(MetadataImage.class),
mock(MetadataDelta.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1702,10 +1741,13 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
service.startup(() -> 1);
+ service.onNewMetadataImage(mock(MetadataImage.class),
mock(MetadataDelta.class));
+
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1763,10 +1805,12 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
service.startup(() -> 1);
+ service.onNewMetadataImage(mock(MetadataImage.class),
mock(MetadataDelta.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1837,10 +1881,12 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
service.startup(() -> 1);
+ service.onNewMetadataImage(mock(MetadataImage.class),
mock(MetadataDelta.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1889,7 +1935,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
when(runtime.scheduleWriteAllOperation(
@@ -1899,6 +1946,7 @@ class ShareCoordinatorServiceTest {
)).thenReturn(List.of(CompletableFuture.completedFuture(null)));
service.startup(() -> 1);
+ service.onNewMetadataImage(mock(MetadataImage.class),
mock(MetadataDelta.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("snapshot-cold-partitions"),
@@ -1951,10 +1999,12 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
service.startup(() -> 2);
+ service.onNewMetadataImage(mock(MetadataImage.class),
mock(MetadataDelta.class));
verify(runtime, times(0))
.scheduleWriteAllOperation(
eq("snapshot-cold-partitions"),
@@ -1980,6 +2030,111 @@ class ShareCoordinatorServiceTest {
service.shutdown();
}
+ @Test
+ public void testPeriodicJobsDoNotRunWhenShareGroupsDisabled() throws
InterruptedException {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ PartitionWriter writer = mock(PartitionWriter.class);
+ MockTime time = new MockTime();
+ MockTimer timer = spy(new MockTimer(time));
+
+ Metrics metrics = new Metrics();
+
+ ShareCoordinatorService service = spy(new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(metrics),
+ time,
+ timer,
+ writer,
+ () -> false // So that the feature config is used.
+ ));
+
+ // Prune job.
+ when(runtime.scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ )).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+ // Snapshot job.
+ when(runtime.scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any()
+ )).thenReturn(List.of());
+
+ assertFalse(service.shouldRunPeriodicJob());
+
+ service.startup(() -> 1);
+
+ MetadataImage mockedImage = mock(MetadataImage.class,
RETURNS_DEEP_STUBS);
+
+ // Feature disabled on start.
+
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort())).thenReturn((short) 0);
+ service.onNewMetadataImage(mockedImage, mock(MetadataDelta.class));
// Jobs will not execute as feature is OFF in image.
+
+ verify(timer, times(0)).add(any()); // Timer task not added.
+ verify(runtime, times(0)).scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ );
+ verify(runtime, times(0)).scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any()
+ );
+ assertFalse(service.shouldRunPeriodicJob());
+
+ // Enable feature.
+ Mockito.reset(mockedImage);
+
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort())).thenReturn((short) 1);
+ service.onNewMetadataImage(mockedImage, mock(MetadataDelta.class));
// Jobs will execute as feature is ON in image.
+
+ verify(timer, times(2)).add(any()); // Timer task added twice (prune,
snapshot).
+ timer.advanceClock(30001L);
+ verify(runtime, times(1)).scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ );
+ verify(runtime, times(1)).scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any()
+ );
+ assertTrue(service.shouldRunPeriodicJob());
+
+ // Disable feature
+ Mockito.reset(mockedImage);
+
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort())).thenReturn((short) 0);
+ service.onNewMetadataImage(mockedImage, mock(MetadataDelta.class));
// Jobs will not execute as feature is on in image.
+ timer.advanceClock(30001L);
+
+ verify(timer, times(4)).add(any()); // Tasks added but will return
immediately.
+ verify(runtime, times(1)).scheduleWriteOperation(
+ eq("write-state-record-prune"),
+ any(),
+ any(),
+ any()
+ );
+ verify(runtime, times(1)).scheduleWriteAllOperation(
+ eq("snapshot-cold-partitions"),
+ any(),
+ any()
+ );
+ assertFalse(service.shouldRunPeriodicJob());
+
+ timer.advanceClock(30001L);
+ verify(timer, times(4)).add(any()); // No new additions.
+
+ service.shutdown();
+ }
+
@Test
public void testShareStateTopicConfigs() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -1995,7 +2150,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
List<String> propNames = List.of(
@@ -2025,7 +2181,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
service.startup(() -> 3);
@@ -2065,7 +2222,8 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer
+ writer,
+ () -> true
));
service.startup(() -> 3);