Caideyipi commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r2922136478


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java:
##########
@@ -662,6 +666,45 @@ private TPipeSubscribeResp 
handlePipeSubscribeCloseInternal(final PipeSubscribeC
     return 
PipeSubscribeCloseResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
   }
 
+  private TPipeSubscribeResp handlePipeSubscribeSeek(final 
PipeSubscribeSeekReq req) {
+    try {
+      return handlePipeSubscribeSeekInternal(req);
+    } catch (final Exception e) {
+      LOGGER.warn("Exception occurred when seeking with request {}", req, e);
+      final String exceptionMessage =
+          String.format(
+              "Subscription: something unexpected happened when seeking with 
request %s: %s",
+              req, e);
+      return PipeSubscribeSeekResp.toTPipeSubscribeResp(
+          RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_SEEK_ERROR, 
exceptionMessage));
+    }
+  }
+
+  private TPipeSubscribeResp handlePipeSubscribeSeekInternal(final 
PipeSubscribeSeekReq req) {
+    // check consumer config thread local
+    final ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
+    if (Objects.isNull(consumerConfig)) {
+      LOGGER.warn(
+          "Subscription: missing consumer config when handling 
PipeSubscribeSeekReq: {}", req);
+      return SUBSCRIPTION_MISSING_CUSTOMER_RESP;

Review Comment:
   MISSING_CONSUMER_RESP?



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java:
##########
@@ -389,6 +389,13 @@ public class CommonConfig {
   private long subscriptionMetaSyncerInitialSyncDelayMinutes = 3;
   private long subscriptionMetaSyncerSyncIntervalMinutes = 3;
 
+  private int subscriptionConsensusBatchMaxDelayInMs = 50;
+  private long subscriptionConsensusBatchMaxSizeInBytes = 8 * MB;
+  private int subscriptionConsensusBatchMaxTabletCount = 64;
+  private int subscriptionConsensusBatchMaxWalEntries = 128;
+
+  private long subscriptionConsensusWalRetentionSizeInBytes = 512 * MB;

Review Comment:
   May consider "throttleThreshold" as default value or directly use that?



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java:
##########
@@ -872,17 +955,41 @@ void checkAndUpdateIndex() {
   }
 
   /**
-   * If there is only one replica, set it to Long.MAX_VALUE. If there are 
multiple replicas, get the
-   * latest SafelyDeletedSearchIndex again. This enables wal to be deleted in 
a timely manner.
+   * Computes and updates the safe-to-delete WAL search index based on 
replication progress and
+   * subscription WAL retention policy. When no subscriptions exist, WAL is 
cleaned normally.
    */
-  void checkAndUpdateSafeDeletedSearchIndex() {
+  public void checkAndUpdateSafeDeletedSearchIndex() {
     if (configuration.isEmpty()) {
       logger.error(
           "Configuration is empty, which is unexpected. Safe deleted search 
index won't be updated this time.");
-    } else if (configuration.size() == 1) {
+      return;
+    }
+
+    final boolean hasSubscriptions = !subscriptionQueues.isEmpty();
+    final long retentionSizeLimit =
+        config.getReplication().getSubscriptionWalRetentionSizeInBytes();
+
+    if (configuration.size() == 1 && !hasSubscriptions) {
+      // Single replica, no subscription consumers => delete all WAL freely
       consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
     } else {
-      consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
+      final long replicationIndex =
+          configuration.size() > 1 ? getMinFlushedSyncIndex() : Long.MAX_VALUE;
+
+      // Subscription WAL retention: if subscriptions exist and retention is 
configured,
+      // prevent WAL deletion when total WAL size is within the retention 
limit.
+      long subscriptionRetentionBound = Long.MAX_VALUE;
+      if (hasSubscriptions && retentionSizeLimit > 0) {
+        final long totalWalSize = consensusReqReader.getTotalSize();
+        if (totalWalSize <= retentionSizeLimit) {
+          // WAL size is within retention limit — preserve all WAL for 
subscribers
+          subscriptionRetentionBound = 
ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;

Review Comment:
   Seemingly a little bit coarse?
   1. The wal clean is either "all" or "not", does not have a bound like "only 
reserve the retention size amount of data"
   2. The reserving logic does not consider the commit indexes of subscriptions



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java:
##########
@@ -30,7 +30,7 @@ public class SubscriptionConfig {
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
 
   public boolean getSubscriptionEnabled() {
-    return false;
+    return true; // TODO: make it configurable after subscription is stable

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to