This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 6567deb00 [server] Fix ReplicaFetcherThread keeps throwing 
OutOfOrderSequenceException because of writer id expire (#1639)
6567deb00 is described below

commit 6567deb001e03cdf5a42a05ae8c5ee92735a25f0
Author: Liebing <[email protected]>
AuthorDate: Mon Sep 15 10:16:13 2025 +0800

    [server] Fix ReplicaFetcherThread keeps throwing 
OutOfOrderSequenceException because of writer id expire (#1639)
---
 .../org/apache/fluss/server/log/LogTablet.java     | 17 +++--
 .../apache/fluss/server/log/WriterAppendInfo.java  | 67 ++++++++++++++----
 .../fluss/server/log/WriterStateManagerTest.java   |  5 +-
 .../apache/fluss/server/replica/ReplicaTest.java   | 38 ++++++++++
 .../fluss/server/replica/ReplicaTestBase.java      |  2 +
 .../replica/fetcher/ReplicaFetcherThreadTest.java  | 80 +++++++++++++++++++++-
 6 files changed, 185 insertions(+), 24 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 8eb19362f..fc0e7f27f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -640,7 +640,7 @@ public final class LogTablet {
             // now that we have valid records, offsets assigned, we need to 
validate the idempotent
             // state of the writers and collect some metadata.
             Either<WriterStateEntry.BatchMetadata, 
Collection<WriterAppendInfo>> validateResult =
-                    analyzeAndValidateWriterState(validRecords);
+                    analyzeAndValidateWriterState(validRecords, 
appendAsLeader);
 
             if (validateResult.isLeft()) {
                 // have duplicated batch metadata, skip the append and update 
append info.
@@ -1003,7 +1003,7 @@ public final class LogTablet {
 
     /** Returns either the duplicated batch metadata (left) or the updated 
writers (right). */
     private Either<WriterStateEntry.BatchMetadata, 
Collection<WriterAppendInfo>>
-            analyzeAndValidateWriterState(MemoryLogRecords records) {
+            analyzeAndValidateWriterState(MemoryLogRecords records, boolean 
isAppendAsLeader) {
         Map<Long, WriterAppendInfo> updatedWriters = new HashMap<>();
 
         for (LogRecordBatch batch : records.batches()) {
@@ -1020,14 +1020,15 @@ public final class LogTablet {
                 }
 
                 // update write append info.
-                updateWriterAppendInfo(writerStateManager, batch, 
updatedWriters);
+                updateWriterAppendInfo(writerStateManager, batch, 
updatedWriters, isAppendAsLeader);
             }
         }
 
         return Either.right(updatedWriters.values());
     }
 
-    void removeExpiredWriter(long currentTimeMs) {
+    @VisibleForTesting
+    public void removeExpiredWriter(long currentTimeMs) {
         synchronized (lock) {
             writerStateManager.removeExpiredWriters(currentTimeMs);
         }
@@ -1107,14 +1108,16 @@ public final class LogTablet {
     private static void updateWriterAppendInfo(
             WriterStateManager writerStateManager,
             LogRecordBatch batch,
-            Map<Long, WriterAppendInfo> writers) {
+            Map<Long, WriterAppendInfo> writers,
+            boolean isAppendAsLeader) {
         long writerId = batch.writerId();
         // update writers.
         WriterAppendInfo appendInfo =
                 writers.computeIfAbsent(writerId, id -> 
writerStateManager.prepareUpdate(writerId));
         appendInfo.append(
                 batch,
-                
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch));
+                
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch),
+                isAppendAsLeader);
     }
 
     static void rebuildWriterState(
@@ -1227,7 +1230,7 @@ public final class LogTablet {
         Map<Long, WriterAppendInfo> loadedWriters = new HashMap<>();
         for (LogRecordBatch batch : records.batches()) {
             if (batch.hasWriterId()) {
-                updateWriterAppendInfo(writerStateManager, batch, 
loadedWriters);
+                updateWriterAppendInfo(writerStateManager, batch, 
loadedWriters, true);
             }
         }
         loadedWriters.values().forEach(writerStateManager::update);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
index cd625b155..cc11cf17f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
@@ -44,13 +44,15 @@ public class WriterAppendInfo {
         return writerId;
     }
 
-    public void append(LogRecordBatch batch, boolean isWriterInBatchExpired) {
+    public void append(
+            LogRecordBatch batch, boolean isWriterInBatchExpired, boolean 
isAppendAsLeader) {
         LogOffsetMetadata firstOffsetMetadata = new 
LogOffsetMetadata(batch.baseLogOffset());
         appendDataBatch(
                 batch.batchSequence(),
                 firstOffsetMetadata,
                 batch.lastLogOffset(),
                 isWriterInBatchExpired,
+                isAppendAsLeader,
                 batch.commitTimestamp());
     }
 
@@ -59,8 +61,9 @@ public class WriterAppendInfo {
             LogOffsetMetadata firstOffsetMetadata,
             long lastOffset,
             boolean isWriterInBatchExpired,
+            boolean isAppendAsLeader,
             long batchTimestamp) {
-        maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, 
lastOffset);
+        maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, 
lastOffset, isAppendAsLeader);
         updatedEntry.addBath(
                 batchSequence,
                 lastOffset,
@@ -69,13 +72,16 @@ public class WriterAppendInfo {
     }
 
     private void maybeValidateDataBatch(
-            int appendFirstSeq, boolean isWriterInBatchExpired, long 
lastOffset) {
+            int appendFirstSeq,
+            boolean isWriterInBatchExpired,
+            long lastOffset,
+            boolean isAppendAsLeader) {
         int currentLastSeq =
                 !updatedEntry.isEmpty()
                         ? updatedEntry.lastBatchSequence()
                         : currentEntry.lastBatchSequence();
         // must be in sequence, even for the first batch should start from 0
-        if (!inSequence(currentLastSeq, appendFirstSeq, 
isWriterInBatchExpired)) {
+        if (!inSequence(currentLastSeq, appendFirstSeq, 
isWriterInBatchExpired, isAppendAsLeader)) {
             throw new OutOfOrderSequenceException(
                     String.format(
                             "Out of order batch sequence for writer %s at 
offset %s in "
@@ -93,16 +99,53 @@ public class WriterAppendInfo {
      * three scenarios will be judged as in sequence:
      *
      * <ul>
-     *   <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, we need to check 
whether the committed
-     *       timestamp of the next batch under the current writerId has 
expired. If it has expired,
-     *       we consider this a special case caused by writerId expiration, 
for this case, to ensure
-     *       the correctness of follower sync, we still treat it as in 
sequence.
-     *   <li>nextBatchSeq == lastBatchSeq + 1L
-     *   <li>lastBatchSeq reaches its maximum value
+     *   <li>1. If lastBatchSeq equals NO_BATCH_SEQUENCE, the following two 
scenarios will be judged
+     *       as in sequence:
+     *       <ul>
+     *         <li>1.1 If the committed timestamp of the next batch under the 
current writerId has
+     *             expired, we consider this a special case caused by writerId 
expiration, for this
+     *             case, to ensure the correctness of follower sync, we still 
treat it as in
+     *             sequence.
+     *         <li>1.2 If the append request is from the follower, we consider 
this is a special
+     *             case caused by inconsistent expiration of writerId between 
the leader and
+     *             follower. To prevent continuous fetch failures on the 
follower side, we still
+     *             treat it as in sequence.
+     *       </ul>
+     *   <li>2. nextBatchSeq == lastBatchSeq + 1L
+     *   <li>3. lastBatchSeq reaches its maximum value
      * </ul>
+     *
+     * <p>For case 1.2, here is a detailed example: The expiration of a writer 
is triggered
+     * asynchronously by the {@code PeriodicWriterIdExpirationCheck} thread at 
intervals defined by
+     * {@code server.writer-id.expiration-check-interval}, which can result in 
slight differences in
+     * the actual expiration times of the same writer on the leader replica 
and follower replicas.
+     * This slight difference leads to a dreadful corner case. Imagine the 
following scenario(set
+     * {@code server.writer-id.expiration-check-interval}: 10min, {@code
+     * server.writer-id.expiration-time}: 12h):
+     *
+     * <pre>{@code
+     * Step     Time         Action of Leader                  Action of 
Follower
+     * 1        00:03:38     receive batch 0 of writer 101
+     * 2        00:03:38                                       fetch batch 0 
of writer 101
+     * 3        12:05:00                                       remove state of 
writer 101
+     * 4        12:10:02     receive batch 1 of writer 101
+     * 5        12:10:02                                       fetch batch 0 
of writer 101
+     * 6        12:11:00     remove state of writer 101
+     * }</pre>
+     *
+     * <p>In step 3, the follower removes the state of writer 101 first, since 
it has been more than
+     * 12 hours since writer 101's last batch write, making it safe to remove. 
However, since the
+     * expiration of writer 101 has not yet occurred on the leader, and a new 
batch 1 is received at
+     * this time, it is successfully written on the leader. At this point, the 
fetcher pulls batch 1
+     * from the leader, but since the state of writer 101 has already been 
cleaned up, an {@link
+     * OutOfOrderSequenceException} will occur during to write if we don't 
treat it as in sequence.
      */
-    private boolean inSequence(int lastBatchSeq, int nextBatchSeq, boolean 
isWriterInBatchExpired) {
-        return (lastBatchSeq == NO_BATCH_SEQUENCE && isWriterInBatchExpired)
+    private boolean inSequence(
+            int lastBatchSeq,
+            int nextBatchSeq,
+            boolean isWriterInBatchExpired,
+            boolean isAppendAsLeader) {
+        return (lastBatchSeq == NO_BATCH_SEQUENCE && (isWriterInBatchExpired 
|| !isAppendAsLeader))
                 || nextBatchSeq == lastBatchSeq + 1L
                 || (nextBatchSeq == 0 && lastBatchSeq == Integer.MAX_VALUE);
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java
index b0ac3cd2b..94b2ccc5c 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java
@@ -126,14 +126,14 @@ public class WriterStateManagerTest {
     void testPrepareUpdateDoesNotMutate() {
         WriterAppendInfo appendInfo = stateManager.prepareUpdate(writerId);
         appendInfo.appendDataBatch(
-                0, new LogOffsetMetadata(15L), 20L, false, 
System.currentTimeMillis());
+                0, new LogOffsetMetadata(15L), 20L, false, true, 
System.currentTimeMillis());
         assertThat(stateManager.lastEntry(writerId)).isNotPresent();
         stateManager.update(appendInfo);
         assertThat(stateManager.lastEntry(writerId)).isPresent();
 
         WriterAppendInfo nextAppendInfo = stateManager.prepareUpdate(writerId);
         nextAppendInfo.appendDataBatch(
-                1, new LogOffsetMetadata(26L), 30L, false, 
System.currentTimeMillis());
+                1, new LogOffsetMetadata(26L), 30L, false, true, 
System.currentTimeMillis());
         assertThat(stateManager.lastEntry(writerId)).isPresent();
 
         WriterStateEntry lastEntry = stateManager.lastEntry(writerId).get();
@@ -521,6 +521,7 @@ public class WriterStateManagerTest {
                 new LogOffsetMetadata(offset),
                 offset,
                 isWriterInBatchExpired,
+                true,
                 lastTimestamp);
         stateManager.update(appendInfo);
         stateManager.updateMapEndOffset(offset + 1);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index 045bd3525..bc1365ab1 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.server.replica;
 
 import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.exception.OutOfOrderSequenceException;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
@@ -48,6 +49,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -76,10 +78,12 @@ import static 
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecor
 import static org.apache.fluss.testutils.DataTestUtils.genKvRecordBatch;
 import static org.apache.fluss.testutils.DataTestUtils.genKvRecords;
 import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
+import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId;
 import static org.apache.fluss.testutils.DataTestUtils.getKeyValuePairs;
 import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link Replica}. */
 final class ReplicaTest extends ReplicaTestBase {
@@ -130,6 +134,40 @@ final class ReplicaTest extends ReplicaTestBase {
         assertLogRecordsEquals(DATA1_ROW_TYPE, 
logReadInfo.getFetchedData().getRecords(), DATA1);
     }
 
+    @Test
+    void testAppendRecordsWithOutOfOrderBatchSequence() throws Exception {
+        Replica logReplica =
+                makeLogReplica(DATA1_PHYSICAL_TABLE_PATH, new 
TableBucket(DATA1_TABLE_ID, 1));
+        makeLogReplicaAsLeader(logReplica);
+
+        long writerId = 101L;
+
+        // 1. append a batch with batchSequence = 0
+        
logReplica.appendRecordsToLeader(genMemoryLogRecordsWithWriterId(DATA1, 
writerId, 0, 0), 0);
+
+        // manual advance time and remove expired writer, the state of writer 
101 will be removed
+        manualClock.advanceTime(Duration.ofHours(12));
+        manualClock.advanceTime(Duration.ofSeconds(1));
+        
assertThat(logReplica.getLogTablet().writerStateManager().activeWriters().size())
+                .isEqualTo(1);
+        
logReplica.getLogTablet().removeExpiredWriter(manualClock.milliseconds());
+        
assertThat(logReplica.getLogTablet().writerStateManager().activeWriters().size())
+                .isEqualTo(0);
+
+        // 2. try to append an out of ordered batch as leader, will throw
+        // OutOfOrderSequenceException
+        assertThatThrownBy(
+                        () ->
+                                logReplica.appendRecordsToLeader(
+                                        genMemoryLogRecordsWithWriterId(DATA1, 
writerId, 2, 10), 0))
+                .isInstanceOf(OutOfOrderSequenceException.class);
+        assertThat(logReplica.getLocalLogEndOffset()).isEqualTo(10);
+
+        // 3. try to append an out of ordered batch as follower
+        
logReplica.appendRecordsToFollower(genMemoryLogRecordsWithWriterId(DATA1, 
writerId, 2, 10));
+        assertThat(logReplica.getLocalLogEndOffset()).isEqualTo(20);
+    }
+
     @Test
     void testPartialPutRecordsToLeader() throws Exception {
         Replica kvReplica =
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 36862c4e5..88e7ec065 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -174,6 +174,8 @@ public class ReplicaTestBase {
         conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, 
MemorySize.parse("512b"));
         conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, 
MemorySize.parse("1kb"));
 
+        conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME, 
Duration.ofHours(12));
+
         scheduler = new FlussScheduler(2);
         scheduler.startup();
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index a982ff058..38c4e075c 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -46,7 +46,7 @@ import org.apache.fluss.server.zk.data.LeaderAndIsr;
 import org.apache.fluss.server.zk.data.TableRegistration;
 import org.apache.fluss.testutils.common.AllCallbackWrapper;
 import org.apache.fluss.utils.clock.Clock;
-import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.clock.ManualClock;
 import org.apache.fluss.utils.concurrent.FlussScheduler;
 import org.apache.fluss.utils.concurrent.Scheduler;
 
@@ -86,6 +86,7 @@ public class ReplicaFetcherThreadTest {
             new AllCallbackWrapper<>(new ZooKeeperExtension());
 
     private static ZooKeeperClient zkClient;
+    private ManualClock manualClock;
     private @TempDir File tempDir;
     private TableBucket tb;
     private final int leaderServerId = 1;
@@ -106,6 +107,7 @@ public class ReplicaFetcherThreadTest {
     @BeforeEach
     public void setup() throws Exception {
         ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
+        manualClock = new ManualClock(System.currentTimeMillis());
         Configuration conf = new Configuration();
         tb = new TableBucket(DATA1_TABLE_ID, 0);
         leaderRM = createReplicaManager(leaderServerId);
@@ -277,6 +279,77 @@ public class ReplicaFetcherThreadTest {
                 () -> 
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(120L));
     }
 
+    @Test
+    void testAppendAsFollowerThrowOutOfOrderSequenceException() throws 
Exception {
+        Replica followerReplica = followerRM.getReplicaOrException(tb);
+
+        long writerId = 101;
+        CompletableFuture<List<ProduceLogResultForBucket>> future;
+
+        // 1. append 2 batches to leader with (writerId=101L, batchSequence=0 
offset=0L)
+        future = new CompletableFuture<>();
+        leaderRM.appendRecordsToLog(
+                1000,
+                1,
+                Collections.singletonMap(
+                        tb, genMemoryLogRecordsWithWriterId(DATA1, writerId, 
0, 0)),
+                future::complete);
+        assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 0L, 10L));
+
+        // 2. append the first batch to follower with (writerId=101L, 
batchSequence=0 offset=0L) to
+        // mock
+        // follower have already fetched one batch.
+        followerReplica.appendRecordsToFollower(
+                genMemoryLogRecordsWithWriterId(DATA1, writerId, 0, 0));
+        assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(10L);
+
+        // advance time
+        manualClock.advanceTime(Duration.ofHours(13));
+
+        // 3. append the second batch to leader with (writerId=101L, 
batchSequence=1 offset=10L)
+        future = new CompletableFuture<>();
+        leaderRM.appendRecordsToLog(
+                1000,
+                1,
+                Collections.singletonMap(
+                        tb, genMemoryLogRecordsWithWriterId(DATA1, writerId, 
1, 0)),
+                future::complete);
+        assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 10L, 20L));
+
+        // 3. mock remove expired writer, writerId=101 will be removed.
+        
assertThat(followerReplica.getLogTablet().writerStateManager().activeWriters().size())
+                .isEqualTo(1);
+        
followerReplica.getLogTablet().removeExpiredWriter(manualClock.milliseconds());
+        
assertThat(followerReplica.getLogTablet().writerStateManager().activeWriters().size())
+                .isEqualTo(0);
+
+        // 4. begin fetcher thread.
+        followerFetcher.addBuckets(
+                Collections.singletonMap(
+                        tb,
+                        new InitialFetchStatus(
+                                DATA1_TABLE_ID, DATA1_TABLE_PATH, leader.id(), 
10L)));
+        followerFetcher.start();
+        // fetcher will force append the second batch to follower
+        retry(
+                Duration.ofSeconds(20),
+                () -> 
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(20L));
+
+        // 5. mock new batch to leader with (writerId=101L, batchSequence=2 
offset=20L)
+        future = new CompletableFuture<>();
+        leaderRM.appendRecordsToLog(
+                1000,
+                1,
+                Collections.singletonMap(
+                        tb, genMemoryLogRecordsWithWriterId(DATA1, writerId, 
2, 0)),
+                future::complete);
+        assertThat(future.get()).containsOnly(new 
ProduceLogResultForBucket(tb, 20L, 30L));
+        // now fetcher will work well since the state of writerId=101 is 
established
+        retry(
+                Duration.ofSeconds(20),
+                () -> 
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(30L));
+    }
+
     private void registerTableInZkClient() throws Exception {
         ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
         zkClient.registerTable(
@@ -319,6 +392,7 @@ public class ReplicaFetcherThreadTest {
     private ReplicaManager createReplicaManager(int serverId) throws Exception 
{
         Configuration conf = new Configuration();
         conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath() + 
"/server-" + serverId);
+        conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME, 
Duration.ofHours(12));
         Scheduler scheduler = new FlussScheduler(2);
         scheduler.startup();
 
@@ -327,7 +401,7 @@ public class ReplicaFetcherThreadTest {
                         conf,
                         zkClient,
                         scheduler,
-                        SystemClock.getInstance(),
+                        manualClock,
                         TestingMetricGroups.TABLET_SERVER_METRICS);
         logManager.startup();
         ReplicaManager replicaManager =
@@ -341,7 +415,7 @@ public class ReplicaFetcherThreadTest {
                         new TabletServerMetadataCache(new 
MetadataManager(null, conf), null),
                         RpcClient.create(conf, 
TestingClientMetricGroup.newInstance(), false),
                         TestingMetricGroups.TABLET_SERVER_METRICS,
-                        SystemClock.getInstance());
+                        manualClock);
         replicaManager.startup();
         return replicaManager;
     }

Reply via email to