This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c39061ae458 Subscription: Drain TsFile batches before termination 
(#17926)
c39061ae458 is described below

commit c39061ae4586f1b3c25f96e61d09f62a3326a2eb
Author: VGalaxies <[email protected]>
AuthorDate: Tue Jun 16 09:50:01 2026 +0800

    Subscription: Drain TsFile batches before termination (#17926)
    
    * Subscription: drain tsfile batches before termination
    
    * Subscription: address termination drain races
    
    * Subscription: add tsfile termination drain IT
    
    * Subscription: fix CI quality warnings
    
    * Subscription: address review comments
---
 .../dual/tablemodel/IoTDBSubscriptionTopicIT.java  |  86 +++++++++
 .../apache/iotdb/db/i18n/DataNodeMiscMessages.java |   5 +
 .../apache/iotdb/db/i18n/DataNodeMiscMessages.java |   4 +
 .../broker/SubscriptionPrefetchingQueue.java       | 199 +++++++++++----------
 .../event/batch/SubscriptionPipeEventBatch.java    |  23 ++-
 .../event/batch/SubscriptionPipeEventBatches.java  |  29 +++
 6 files changed, 246 insertions(+), 100 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
index f3c0dd29f46..c71a7b2319b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
@@ -56,6 +56,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.Consumer;
 
@@ -93,6 +94,8 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
         .getConfig()
         .getCommonConfig()
         .setPipeMetaSyncerSyncIntervalMinutes(1)
+        .setSubscriptionPrefetchTsFileBatchMaxDelayInMs(600_000)
+        .setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(64 * 1024 * 1024)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
   }
@@ -382,6 +385,89 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     }
   }
 
+  @Test
+  public void testTsFileSnapshotDrainsPendingBatchBeforeTermination() throws 
Exception {
+    TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
+    TableModelUtils.createDataBaseAndTable(receiverEnv, "test1", "test1");
+
+    // Insert historical data and create a closed TsFile before snapshot 
subscription starts.
+    TableModelUtils.insertData("test1", "test1", 0, 10, senderEnv);
+
+    final String topicName = "topic_drain_tsfile_batch_before_termination";
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+      config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
+      config.put(TopicConstant.DATABASE_KEY, "test1");
+      config.put(TopicConstant.TABLE_KEY, "test1");
+      // Force TsFile parsing so the snapshot data is buffered in 
SubscriptionPipeTsFileEventBatch.
+      config.put(TopicConstant.START_TIME_KEY, 1);
+      session.createTopic(topicName, config);
+    }
+    assertTopicCount(1);
+
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final AtomicReference<Throwable> consumerFailure = new AtomicReference<>();
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final ISubscriptionTablePullConsumer consumer =
+                      new SubscriptionTablePullConsumerBuilder()
+                          .host(host)
+                          .port(port)
+                          .consumerId("c_drain")
+                          .consumerGroupId("cg_drain")
+                          .autoCommit(false)
+                          .build();
+                  final ITableSession session = 
receiverEnv.getTableSessionConnection()) {
+                consumer.open();
+                consumer.subscribe(topicName);
+                while (!isClosed.get()) {
+                  final List<SubscriptionMessage> messages =
+                      
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+                  insertData(messages, session);
+                  consumer.commitSync(messages);
+                }
+              } catch (final Throwable e) {
+                consumerFailure.set(e);
+              } finally {
+                LOGGER.info("draining consumer exiting...");
+              }
+            },
+            String.format("%s - draining consumer", 
testName.getDisplayName()));
+    thread.start();
+
+    try {
+      final Consumer<String> handleFailure =
+          o -> {
+            TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+            TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+          };
+      AWAIT.untilAsserted(
+          () -> {
+            Assert.assertNull(consumerFailure.get());
+
+            try (final SyncConfigNodeIServiceClient client =
+                (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+              final TShowSubscriptionResp showSubscriptionResp =
+                  client.showSubscription(new TShowSubscriptionReq());
+              Assert.assertEquals(
+                  RpcUtils.SUCCESS_STATUS.getCode(), 
showSubscriptionResp.status.getCode());
+              Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
+              Assert.assertEquals(0, 
showSubscriptionResp.subscriptionInfoList.size());
+            }
+
+            TableModelUtils.assertData("test1", "test1", 1, 10, receiverEnv, 
handleFailure);
+          });
+    } finally {
+      isClosed.set(true);
+      thread.join();
+    }
+  }
+
   /////////////////////////////// utility ///////////////////////////////
 
   private void assertTopicCount(final int count) throws Exception {
diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
index 8eb26f05e0a..2f0a53db5c4 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
@@ -779,6 +779,11 @@ public final class DataNodeMiscMessages {
       "Exception {} occurred when {} execute receiver subtask";
   public static final String EXCEPTION_CONSTRUCT_TABLET_ITERATOR =
       "Exception {} occurred when {} construct ToTabletIterator";
+  public static final String 
EXCEPTION_EMIT_EVENTS_BEFORE_COMMIT_TERMINATE_EVENT =
+      "Subscription: SubscriptionPrefetchingQueue {} failed to emit remaining 
events before "
+          + "committing PipeTerminateEvent {}";
+  public static final String COMMIT_TERMINATE_EVENT =
+      "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent 
{}";
 
   // 
---------------------------------------------------------------------------
   // consensus – BaseStateMachine
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
index 69e4370b664..c5625447212 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
@@ -778,6 +778,10 @@ public final class DataNodeMiscMessages {
       "异常 {} 在 {} 执行接收子任务时发生";
   public static final String EXCEPTION_CONSTRUCT_TABLET_ITERATOR =
       "异常 {} 在 {} 构造 ToTabletIterator 时发生";
+  public static final String 
EXCEPTION_EMIT_EVENTS_BEFORE_COMMIT_TERMINATE_EVENT =
+      "订阅:SubscriptionPrefetchingQueue {} 在提交 PipeTerminateEvent {} 前封存剩余事件失败";
+  public static final String COMMIT_TERMINATE_EVENT =
+      "订阅:SubscriptionPrefetchingQueue {} 提交 PipeTerminateEvent {}";
 
   // 
---------------------------------------------------------------------------
   // consensus – BaseStateMachine
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 086fde0fbcc..8fc7227fd15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -117,6 +117,7 @@ public abstract class SubscriptionPrefetchingQueue {
   private volatile TsFileInsertionEvent currentTsFileInsertionEvent;
   private volatile RetryableEvent<TabletInsertionEvent> 
currentTabletInsertionEvent;
   private volatile SubscriptionTsFileToTabletIterator currentToTabletIterator;
+  private PipeTerminateEvent currentTerminateEvent;
 
   public SubscriptionPrefetchingQueue(
       final String brokerId,
@@ -175,6 +176,10 @@ public abstract class SubscriptionPrefetchingQueue {
           .clearReferenceCount(this.getClass().getName());
       currentTabletInsertionEvent = null;
     }
+    if (Objects.nonNull(currentTerminateEvent)) {
+      currentTerminateEvent.clearReferenceCount(this.getClass().getName());
+      currentTerminateEvent = null;
+    }
   }
 
   /////////////////////////////////  lock  /////////////////////////////////
@@ -236,44 +241,8 @@ public abstract class SubscriptionPrefetchingQueue {
       onEvent();
     }
 
-    final long size = prefetchingQueue.size();
-    long count = 0;
-
-    SubscriptionEvent event;
     try {
-      while (count++ < size // limit control
-          && Objects.nonNull(
-              event =
-                  prefetchingQueue.poll(
-                      
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
-                      TimeUnit.MILLISECONDS))) {
-        if (event.isCommitted()) {
-          LOGGER.warn(
-              "Subscription: SubscriptionPrefetchingQueue {} poll committed 
event {} from prefetching queue (broken invariant), remove it",
-              this,
-              event);
-          // no need to update inFlightEvents
-          continue;
-        }
-
-        if (!event.pollable()) {
-          LOGGER.warn(
-              "Subscription: SubscriptionPrefetchingQueue {} poll non-pollable 
event {} from prefetching queue (broken invariant), nack and remove it",
-              this,
-              event);
-          event.nack(); // now pollable
-          // no need to update inFlightEvents and prefetchingQueue
-          continue;
-        }
-
-        // This operation should be performed before updating inFlightEvents 
to prevent multiple
-        // consumers from consuming the same event.
-        event.recordLastPolledTimestamp(); // now non-pollable
-
-        inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), 
event);
-        event.recordLastPolledConsumerId(consumerId);
-        return event;
-      }
+      return pollPrefetchedEvent(consumerId);
     } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.warn(
@@ -307,40 +276,8 @@ public abstract class SubscriptionPrefetchingQueue {
           onEvent();
         }
 
-        final long size = prefetchingQueue.size();
-        long count = 0;
-
-        while (count++ < size // limit control
-            && Objects.nonNull(
-                event =
-                    prefetchingQueue.poll(
-                        
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
-                        TimeUnit.MILLISECONDS))) {
-          if (event.isCommitted()) {
-            LOGGER.warn(
-                "Subscription: SubscriptionPrefetchingQueue {} poll committed 
event {} from prefetching queue (broken invariant), remove it",
-                this,
-                event);
-            // no need to update inFlightEvents
-            continue;
-          }
-
-          if (!event.pollable()) {
-            LOGGER.warn(
-                "Subscription: SubscriptionPrefetchingQueue {} poll 
non-pollable event {} from prefetching queue (broken invariant), nack and 
remove it",
-                this,
-                event);
-            event.nack(); // now pollable
-            // no need to update inFlightEvents and prefetchingQueue
-            continue;
-          }
-
-          // This operation should be performed before updating inFlightEvents 
to prevent multiple
-          // consumers from consuming the same event.
-          event.recordLastPolledTimestamp(); // now non-pollable
-
-          inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), 
event);
-          event.recordLastPolledConsumerId(consumerId);
+        event = pollPrefetchedEvent(consumerId);
+        if (Objects.nonNull(event)) {
           return event;
         }
       } catch (final InterruptedException e) {
@@ -356,6 +293,49 @@ public abstract class SubscriptionPrefetchingQueue {
     return null;
   }
 
+  private synchronized SubscriptionEvent pollPrefetchedEvent(final String 
consumerId)
+      throws InterruptedException {
+    final long size = prefetchingQueue.size();
+    long count = 0;
+
+    SubscriptionEvent event;
+    while (count++ < size // limit control
+        && Objects.nonNull(
+            event =
+                prefetchingQueue.poll(
+                    
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
+                    TimeUnit.MILLISECONDS))) {
+      if (event.isCommitted()) {
+        LOGGER.warn(
+            "Subscription: SubscriptionPrefetchingQueue {} poll committed 
event {} from prefetching queue (broken invariant), remove it",
+            this,
+            event);
+        // no need to update inFlightEvents
+        continue;
+      }
+
+      if (!event.pollable()) {
+        LOGGER.warn(
+            "Subscription: SubscriptionPrefetchingQueue {} poll non-pollable 
event {} from prefetching queue (broken invariant), nack and remove it",
+            this,
+            event);
+        event.nack(); // now pollable
+        // no need to update inFlightEvents and prefetchingQueue
+        continue;
+      }
+
+      // This operation should be performed before updating inFlightEvents to 
prevent multiple
+      // consumers from consuming the same event.
+      event.recordLastPolledTimestamp(); // now non-pollable
+
+      inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), 
event);
+      event.recordLastPolledConsumerId(consumerId);
+      return event;
+    }
+
+    return null;
+  }
+
   /////////////////////////////// prefetch ///////////////////////////////
 
   public boolean executePrefetch() {
@@ -366,7 +346,12 @@ public abstract class SubscriptionPrefetchingQueue {
       }
       reportStateIfNeeded();
       // TODO: more refined behavior (prefetch/serialize/...) control
-      if (states.shouldPrefetch()) {
+      if (hasCurrentTerminateEvent()) {
+        tryCommitCurrentTerminateEventIfPresent();
+        remapInFlightEventsSnapshot(
+            committedCleaner, pollableNacker, responsePrefetcher, 
responseSerializer);
+        return true;
+      } else if (states.shouldPrefetch()) {
         tryPrefetch();
         remapInFlightEventsSnapshot(
             committedCleaner, pollableNacker, responsePrefetcher, 
responseSerializer);
@@ -467,6 +452,10 @@ public abstract class SubscriptionPrefetchingQueue {
    * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
    */
   private synchronized void tryPrefetch() {
+    if (Objects.nonNull(currentTerminateEvent) && 
!tryCommitCurrentTerminateEvent()) {
+      return;
+    }
+
     while (!inputPendingQueue.isEmpty() || 
Objects.nonNull(currentTabletInsertionEvent)) {
       if (Objects.nonNull(currentTabletInsertionEvent)) {
         final RetryableState state = 
onRetryableTabletInsertionEvent(currentTabletInsertionEvent);
@@ -497,16 +486,10 @@ public abstract class SubscriptionPrefetchingQueue {
       }
 
       if (event instanceof PipeTerminateEvent) {
-        final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
-        // add mark completed hook
-        terminateEvent.addOnCommittedHook(this::markCompleted);
-        // commit directly
-        ((PipeTerminateEvent) event)
-            
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
-        LOGGER.info(
-            "Subscription: SubscriptionPrefetchingQueue {} commit 
PipeTerminateEvent {}",
-            this,
-            terminateEvent);
+        currentTerminateEvent = (PipeTerminateEvent) event;
+        if (!tryCommitCurrentTerminateEvent()) {
+          return;
+        }
         continue;
       }
 
@@ -549,6 +532,11 @@ public abstract class SubscriptionPrefetchingQueue {
   }
 
   private synchronized void tryPrefetchV2() {
+    if (Objects.nonNull(currentTerminateEvent)) {
+      tryCommitCurrentTerminateEvent();
+      return;
+    }
+
     if (!prefetchingQueue.isEmpty()) {
       return;
     }
@@ -613,16 +601,8 @@ public abstract class SubscriptionPrefetchingQueue {
     }
 
     if (event instanceof PipeTerminateEvent) {
-      final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
-      // add mark completed hook
-      terminateEvent.addOnCommittedHook(this::markCompleted);
-      // commit directly
-      ((PipeTerminateEvent) event)
-          
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
-      LOGGER.info(
-          "Subscription: SubscriptionPrefetchingQueue {} commit 
PipeTerminateEvent {}",
-          this,
-          terminateEvent);
+      currentTerminateEvent = (PipeTerminateEvent) event;
+      tryCommitCurrentTerminateEvent();
       return;
     }
 
@@ -731,6 +711,41 @@ public abstract class SubscriptionPrefetchingQueue {
     return batches.onEvent(this::prefetchEvent);
   }
 
+  private synchronized boolean hasCurrentTerminateEvent() {
+    return Objects.nonNull(currentTerminateEvent);
+  }
+
+  private synchronized void tryCommitCurrentTerminateEventIfPresent() {
+    if (Objects.nonNull(currentTerminateEvent)) {
+      tryCommitCurrentTerminateEvent();
+    }
+  }
+
+  private synchronized boolean tryCommitCurrentTerminateEvent() {
+    try {
+      batches.emitAll(this::prefetchEvent);
+    } catch (final Exception e) {
+      LOGGER.warn(
+          
DataNodeMiscMessages.EXCEPTION_EMIT_EVENTS_BEFORE_COMMIT_TERMINATE_EVENT,
+          this,
+          currentTerminateEvent,
+          e);
+      return false;
+    }
+
+    if (!prefetchingQueue.isEmpty() || !inFlightEvents.isEmpty()) {
+      return false;
+    }
+
+    // Add mark completed hook only when all subscription events have been 
consumed.
+    currentTerminateEvent.addOnCommittedHook(this::markCompleted);
+    currentTerminateEvent.decreaseReferenceCount(
+        SubscriptionPrefetchingQueue.class.getName(), true);
+    LOGGER.info(DataNodeMiscMessages.COMMIT_TERMINATE_EVENT, this, 
currentTerminateEvent);
+    currentTerminateEvent = null;
+    return true;
+  }
+
   /////////////////////////////// commit ///////////////////////////////
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index d25573add6b..0707d979d6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -71,14 +71,7 @@ public abstract class SubscriptionPipeEventBatch {
   protected synchronized boolean onEvent(final Consumer<SubscriptionEvent> 
consumer)
       throws Exception {
     if (shouldEmit() && !enrichedEvents.isEmpty()) {
-      if (Objects.isNull(events)) {
-        events = generateSubscriptionEvents();
-      }
-      if (Objects.nonNull(events)) {
-        events.forEach(consumer);
-        return true;
-      }
-      return false;
+      return emit(consumer);
     }
     return false;
   }
@@ -101,6 +94,20 @@ public abstract class SubscriptionPipeEventBatch {
     return onEvent(consumer);
   }
 
+  protected synchronized boolean emit(final Consumer<SubscriptionEvent> 
consumer) throws Exception {
+    if (enrichedEvents.isEmpty()) {
+      return false;
+    }
+    if (Objects.isNull(events)) {
+      events = generateSubscriptionEvents();
+    }
+    if (Objects.nonNull(events)) {
+      events.forEach(consumer);
+      return true;
+    }
+    return false;
+  }
+
   /////////////////////////////// utility ///////////////////////////////
 
   protected abstract void onTabletInsertionEvent(final TabletInsertionEvent 
event);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
index 467f788f797..0cb81ca43e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
@@ -142,6 +142,35 @@ public class SubscriptionPipeEventBatches {
     return hasNew.get();
   }
 
+  public boolean emitAll(final Consumer<SubscriptionEvent> consumer) throws 
Exception {
+    final AtomicBoolean hasNew = new AtomicBoolean(false);
+    Exception exception = null;
+    for (final int regionId : ImmutableList.copyOf(regionIdToBatch.keySet())) {
+      try {
+        segmentLock.lock(regionId);
+        final SubscriptionPipeEventBatch batch = regionIdToBatch.get(regionId);
+        if (Objects.isNull(batch)) {
+          continue;
+        }
+        if (batch.emit(consumer)) {
+          hasNew.set(true);
+          regionIdToBatch.remove(regionId);
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            DataNodeMiscMessages.EXCEPTION_SEALING_EVENTS, 
regionIdToBatch.get(regionId), e);
+        exception = e;
+      } finally {
+        segmentLock.unlock(regionId);
+      }
+    }
+
+    if (Objects.nonNull(exception)) {
+      throw exception;
+    }
+    return hasNew.get();
+  }
+
   public void cleanUp() {
     regionIdToBatch.values().forEach(batch -> batch.cleanUp(true));
     regionIdToBatch.clear();

Reply via email to