Copilot commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r2870247778


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -109,89 +169,160 @@ public List<SubscriptionCommitContext> commit(
       final List<SubscriptionCommitContext> commitContexts,
       final boolean nack) {
     final String consumerGroupId = consumerConfig.getConsumerGroupId();
-    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
-    if (Objects.isNull(broker)) {
+    final String consumerId = consumerConfig.getConsumerId();
+    final List<SubscriptionCommitContext> allSuccessful = new ArrayList<>();
+
+    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+
+    if (Objects.isNull(pipeBroker) && Objects.isNull(consensusBroker)) {
       final String errorMessage =
-          String.format(
-              "Subscription: broker bound to consumer group [%s] does not 
exist", consumerGroupId);
+          String.format("Subscription: no broker bound to consumer group 
[%s]", consumerGroupId);
       LOGGER.warn(errorMessage);
       throw new SubscriptionException(errorMessage);
     }
-    final String consumerId = consumerConfig.getConsumerId();
-    return broker.commit(consumerId, commitContexts, nack);
+
+    // Partition commit contexts by which broker owns the topic.
+    final List<SubscriptionCommitContext> pipeContexts = new ArrayList<>();
+    final List<SubscriptionCommitContext> consensusContexts = new 
ArrayList<>();
+    for (final SubscriptionCommitContext ctx : commitContexts) {
+      final String topicName = ctx.getTopicName();
+      if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
+        consensusContexts.add(ctx);
+      } else {
+        pipeContexts.add(ctx);
+      }
+    }
+
+    if (Objects.nonNull(pipeBroker) && !pipeContexts.isEmpty()) {
+      allSuccessful.addAll(pipeBroker.commit(consumerId, pipeContexts, nack));
+    }
+    if (Objects.nonNull(consensusBroker) && !consensusContexts.isEmpty()) {
+      allSuccessful.addAll(consensusBroker.commit(consumerId, 
consensusContexts, nack));
+    }
+
+    return allSuccessful;
   }
 
   public boolean isCommitContextOutdated(final SubscriptionCommitContext 
commitContext) {
     final String consumerGroupId = commitContext.getConsumerGroupId();
-    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
-    if (Objects.isNull(broker)) {
+    final String topicName = commitContext.getTopicName();
+
+    // Try consensus broker first
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+    if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
+      return consensusBroker.isCommitContextOutdated(commitContext);
+    }
+
+    // Fall back to pipe broker
+    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    if (Objects.isNull(pipeBroker)) {
       return true;
     }
-    return broker.isCommitContextOutdated(commitContext);
+    return pipeBroker.isCommitContextOutdated(commitContext);
   }
 
   public List<String> fetchTopicNamesToUnsubscribe(
       final ConsumerConfig consumerConfig, final Set<String> topicNames) {
     final String consumerGroupId = consumerConfig.getConsumerGroupId();
-    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
-    if (Objects.isNull(broker)) {
+
+    // Consensus-based subscription topics are unbounded streams, so they do 
not trigger
+    // auto-unsubscribe.
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+    final Set<String> pipeOnlyTopicNames;
+    if (Objects.nonNull(consensusBroker)) {
+      pipeOnlyTopicNames = new java.util.HashSet<>(topicNames);
+      pipeOnlyTopicNames.removeIf(consensusBroker::hasQueue);
+    } else {
+      pipeOnlyTopicNames = topicNames;
+    }
+
+    if (pipeOnlyTopicNames.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    if (Objects.isNull(pipeBroker)) {
       return Collections.emptyList();
     }
-    return broker.fetchTopicNamesToUnsubscribe(topicNames);
+    return pipeBroker.fetchTopicNamesToUnsubscribe(pipeOnlyTopicNames);
   }
 
   /////////////////////////////// broker ///////////////////////////////
 
   public boolean isBrokerExist(final String consumerGroupId) {
-    return consumerGroupIdToSubscriptionBroker.containsKey(consumerGroupId);
+    return consumerGroupIdToPipeBroker.containsKey(consumerGroupId)
+        || consumerGroupIdToConsensusBroker.containsKey(consumerGroupId);
   }
 
   public void createBrokerIfNotExist(final String consumerGroupId) {
-    consumerGroupIdToSubscriptionBroker.computeIfAbsent(consumerGroupId, 
SubscriptionBroker::new);
-    LOGGER.info("Subscription: create broker bound to consumer group [{}]", 
consumerGroupId);
+    consumerGroupIdToPipeBroker.computeIfAbsent(consumerGroupId, 
SubscriptionBroker::new);
+    LOGGER.info("Subscription: create pipe broker bound to consumer group 
[{}]", consumerGroupId);
   }
 
   /**
    * @return {@code true} if drop broker success, {@code false} otherwise
    */
   public boolean dropBroker(final String consumerGroupId) {
     final AtomicBoolean dropped = new AtomicBoolean(false);
-    consumerGroupIdToSubscriptionBroker.compute(
+
+    // Drop pipe broker
+    consumerGroupIdToPipeBroker.compute(
         consumerGroupId,
         (id, broker) -> {
           if (Objects.isNull(broker)) {
+            dropped.set(true);

Review Comment:
   `dropBroker` sets `dropped=true` when the pipe broker entry is already null. 
This causes `dropBroker` to return true even when nothing was removed, and can 
also mask failure to drop a non-empty consensus broker (pipe broker null => 
dropped=true, consensus broker kept). Only mark `dropped` true when an existing 
broker entry is actually removed.
   ```suggestion
               // No existing broker to drop
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker.consensus;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Manages commit state for consensus-based subscriptions.
+ *
+ * <p>This manager tracks which events have been committed by consumers and 
maps commit IDs back to
+ * WAL search indices. It maintains the progress for each (consumerGroup, 
topic, region) triple and
+ * supports persistence and recovery.
+ *
+ * <p>Progress is tracked <b>per-region</b> because searchIndex is 
region-local — each DataRegion
+ * has its own independent WAL with its own searchIndex namespace. Using a 
single state per topic
+ * would cause TreeSet deduplication bugs when different regions emit the same 
searchIndex value.
+ *
+ * <p>Key responsibilities:
+ *
+ * <ul>
+ *   <li>Track the mapping from commitId to searchIndex
+ *   <li>Handle commit/ack from consumers
+ *   <li>Persist and recover progress state
+ * </ul>
+ */
+public class ConsensusSubscriptionCommitManager {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ConsensusSubscriptionCommitManager.class);
+
+  private static final String PROGRESS_FILE_PREFIX = 
"consensus_subscription_progress_";
+  private static final String PROGRESS_FILE_SUFFIX = ".dat";
+
+  /** Key: "consumerGroupId_topicName_regionId" -> progress tracking state */
+  private final Map<String, ConsensusSubscriptionCommitState> commitStates =
+      new ConcurrentHashMap<>();
+
+  private final String persistDir;
+
+  private ConsensusSubscriptionCommitManager() {
+    this.persistDir =
+        IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+            + File.separator
+            + "subscription"
+            + File.separator
+            + "consensus_progress";
+    final File dir = new File(persistDir);
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+  }
+
+  /**
+   * Gets or creates the commit state for a specific (consumerGroup, topic, 
region) triple.
+   *
+   * @param consumerGroupId the consumer group ID
+   * @param topicName the topic name
+   * @param regionId the consensus group / data region ID string
+   * @return the commit state
+   */
+  public ConsensusSubscriptionCommitState getOrCreateState(
+      final String consumerGroupId, final String topicName, final String 
regionId) {
+    final String key = generateKey(consumerGroupId, topicName, regionId);
+    return commitStates.computeIfAbsent(
+        key,
+        k -> {
+          // Try to recover from persisted state
+          final ConsensusSubscriptionCommitState recovered = tryRecover(key);
+          if (recovered != null) {
+            return recovered;
+          }
+          return new ConsensusSubscriptionCommitState(new 
SubscriptionConsensusProgress(0L, 0L));
+        });
+  }
+
+  /**
+   * Records commitId to searchIndex mapping for later commit handling.
+   *
+   * @param consumerGroupId the consumer group ID
+   * @param topicName the topic name
+   * @param regionId the consensus group / data region ID string
+   * @param commitId the assigned commit ID
+   * @param searchIndex the WAL search index corresponding to this event
+   */
+  public void recordCommitMapping(
+      final String consumerGroupId,
+      final String topicName,
+      final String regionId,
+      final long commitId,
+      final long searchIndex) {
+    final ConsensusSubscriptionCommitState state =
+        getOrCreateState(consumerGroupId, topicName, regionId);
+    state.recordMapping(commitId, searchIndex);
+  }
+
+  /**
+   * Handles commit (ack) for an event. Updates the progress and potentially 
advances the committed
+   * search index.
+   *
+   * @param consumerGroupId the consumer group ID
+   * @param topicName the topic name
+   * @param regionId the consensus group / data region ID string
+   * @param commitId the committed event's commit ID
+   * @return true if commit handled successfully
+   */
+  public boolean commit(
+      final String consumerGroupId,
+      final String topicName,
+      final String regionId,
+      final long commitId) {
+    final String key = generateKey(consumerGroupId, topicName, regionId);
+    final ConsensusSubscriptionCommitState state = commitStates.get(key);
+    if (state == null) {
+      LOGGER.warn(
+          "ConsensusSubscriptionCommitManager: Cannot commit for unknown 
state, "
+              + "consumerGroupId={}, topicName={}, regionId={}, commitId={}",
+          consumerGroupId,
+          topicName,
+          regionId,
+          commitId);
+      return false;
+    }
+    final boolean success = state.commit(commitId);
+    if (success) {
+      // Periodically persist progress
+      persistProgressIfNeeded(key, state);
+    }
+    return success;
+  }
+
+  /**
+   * Gets the current committed search index for a specific region's state.
+   *
+   * @param consumerGroupId the consumer group ID
+   * @param topicName the topic name
+   * @param regionId the consensus group / data region ID string
+   * @return the committed search index, or -1 if no state exists
+   */
+  public long getCommittedSearchIndex(
+      final String consumerGroupId, final String topicName, final String 
regionId) {
+    final String key = generateKey(consumerGroupId, topicName, regionId);
+    final ConsensusSubscriptionCommitState state = commitStates.get(key);
+    if (state == null) {
+      return -1;
+    }
+    return state.getCommittedSearchIndex();
+  }
+
+  /**
+   * Removes state for a specific (consumerGroup, topic, region) triple.
+   *
+   * @param consumerGroupId the consumer group ID
+   * @param topicName the topic name
+   * @param regionId the consensus group / data region ID string
+   */
+  public void removeState(
+      final String consumerGroupId, final String topicName, final String 
regionId) {
+    final String key = generateKey(consumerGroupId, topicName, regionId);
+    commitStates.remove(key);
+    // Clean up persisted file
+    final File file = getProgressFile(key);
+    if (file.exists()) {
+      file.delete();
+    }
+  }
+
+  /**
+   * Removes all states for a given (consumerGroup, topic) pair across all 
regions. Used during
+   * subscription teardown when the individual regionIds may not be readily 
available.
+   *
+   * @param consumerGroupId the consumer group ID
+   * @param topicName the topic name
+   */
+  public void removeAllStatesForTopic(final String consumerGroupId, final 
String topicName) {
+    final String prefix = consumerGroupId + "_" + topicName + "_";
+    final Iterator<Map.Entry<String, ConsensusSubscriptionCommitState>> it =
+        commitStates.entrySet().iterator();
+    while (it.hasNext()) {
+      final Map.Entry<String, ConsensusSubscriptionCommitState> entry = 
it.next();
+      if (entry.getKey().startsWith(prefix)) {
+        it.remove();
+        final File file = getProgressFile(entry.getKey());
+        if (file.exists()) {
+          file.delete();
+        }
+      }
+    }
+  }
+
+  /** Persists all states. Should be called during graceful shutdown. */
+  public void persistAll() {
+    for (final Map.Entry<String, ConsensusSubscriptionCommitState> entry :
+        commitStates.entrySet()) {
+      persistProgress(entry.getKey(), entry.getValue());
+    }
+  }
+
+  // ======================== Helper Methods ========================
+
+  private String generateKey(
+      final String consumerGroupId, final String topicName, final String 
regionId) {
+    return consumerGroupId + "_" + topicName + "_" + regionId;
+  }
+
+  private File getProgressFile(final String key) {
+    return new File(persistDir, PROGRESS_FILE_PREFIX + key + 
PROGRESS_FILE_SUFFIX);
+  }
+
+  private ConsensusSubscriptionCommitState tryRecover(final String key) {
+    final File file = getProgressFile(key);
+    if (!file.exists()) {
+      return null;
+    }
+    try (final FileInputStream fis = new FileInputStream(file)) {
+      final byte[] bytes = new byte[(int) file.length()];
+      fis.read(bytes);

Review Comment:
   `tryRecover` reads the progress file with a single `fis.read(bytes)` call, 
which is not guaranteed to fill the buffer. This can lead to partial reads and 
corrupted/failed deserialization. Use a readFully loop / 
`DataInputStream.readFully`, and validate that the expected number of bytes was 
read before deserializing.
   ```suggestion
         int offset = 0;
         while (offset < bytes.length) {
           final int bytesRead = fis.read(bytes, offset, bytes.length - offset);
           if (bytesRead < 0) {
             break;
           }
           offset += bytesRead;
         }
         if (offset != bytes.length) {
           LOGGER.warn(
               "Failed to fully read consensus subscription progress from {}: 
expected {} bytes, read {} bytes",
               file,
               bytes.length,
               offset);
           return null;
         }
   ```



##########
example/session/pom.xml:
##########
@@ -40,4 +40,17 @@
             <version>${project.version}</version>
         </dependency>
     </dependencies>
+    <!-- TODO: remove below -->
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>11</source>
+                    <target>11</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>

Review Comment:
   This module overrides the example parent’s compiler level (example/pom.xml 
sets source/target=8) to Java 11. That can break building examples under the 
project’s expected Java version and makes the module inconsistent with other 
examples. Prefer inheriting the parent compiler settings (or updating the 
parent if the whole examples tree should move to 11) instead of overriding in 
this module.
   ```suggestion
   
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionSetupHandler.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker.consensus;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.iot.IoTConsensus;
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Handles the setup and teardown of consensus-based subscription queues on 
DataNode. When a
+ * real-time subscription is detected, this handler finds the local 
IoTConsensus data regions,
+ * creates the appropriate converter, and binds prefetching queues to the 
subscription broker.
+ */
+public class ConsensusSubscriptionSetupHandler {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ConsensusSubscriptionSetupHandler.class);
+
+  private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private ConsensusSubscriptionSetupHandler() {
+    // utility class
+  }
+
+  /**
+   * Ensures that the IoTConsensus new-peer callback is set, so that when a 
new DataRegion is
+   * created, all active consensus subscriptions are automatically bound to 
the new region.
+   */
+  public static void ensureNewRegionListenerRegistered() {
+    if (IoTConsensus.onNewPeerCreated != null) {
+      return;
+    }
+    IoTConsensus.onNewPeerCreated = 
ConsensusSubscriptionSetupHandler::onNewRegionCreated;
+    LOGGER.info(
+        "Set IoTConsensus.onNewPeerCreated callback for consensus subscription 
auto-binding");
+  }
+
+  /**
+   * Callback invoked when a new DataRegion (IoTConsensusServerImpl) is 
created locally. Queries
+   * existing subscription metadata to find all active consensus subscriptions 
and binds prefetching
+   * queues to the new region.
+   */
+  private static void onNewRegionCreated(
+      final ConsensusGroupId groupId, final IoTConsensusServerImpl serverImpl) 
{
+    if (!(groupId instanceof DataRegionId)) {
+      return;
+    }
+
+    // Query existing metadata keepers for all active subscriptions
+    final Map<String, java.util.Set<String>> allSubscriptions =
+        SubscriptionAgent.consumer().getAllSubscriptions();
+    if (allSubscriptions.isEmpty()) {
+      return;
+    }
+
+    final ConsensusSubscriptionCommitManager commitManager =
+        ConsensusSubscriptionCommitManager.getInstance();
+    final long startSearchIndex = serverImpl.getSearchIndex() + 1;
+
+    LOGGER.info(
+        "New DataRegion {} created, checking {} consumer group(s) for 
auto-binding, "
+            + "startSearchIndex={}",
+        groupId,
+        allSubscriptions.size(),
+        startSearchIndex);
+
+    for (final Map.Entry<String, java.util.Set<String>> groupEntry : 
allSubscriptions.entrySet()) {
+      final String consumerGroupId = groupEntry.getKey();
+      for (final String topicName : groupEntry.getValue()) {
+        if (!isConsensusBasedTopic(topicName)) {
+          continue;
+        }
+        try {
+          final Map<String, TopicConfig> topicConfigs =
+              
SubscriptionAgent.topic().getTopicConfigs(java.util.Collections.singleton(topicName));
+          final TopicConfig topicConfig = topicConfigs.get(topicName);
+          if (topicConfig == null) {
+            continue;
+          }
+
+          // Resolve the new DataRegion's actual database name
+          final DataRegion dataRegion =
+              StorageEngine.getInstance().getDataRegion((DataRegionId) 
groupId);
+          if (dataRegion == null) {
+            continue;
+          }
+          final String dbRaw = dataRegion.getDatabaseName();
+          final String dbTableModel = dbRaw.startsWith("root.") ? 
dbRaw.substring(5) : dbRaw;
+
+          // For table topics, skip if this region's database doesn't match 
the topic filter
+          if (topicConfig.isTableTopic()) {
+            final String topicDb =
+                topicConfig.getStringOrDefault(
+                    TopicConstant.DATABASE_KEY, 
TopicConstant.DATABASE_DEFAULT_VALUE);
+            if (topicDb != null
+                && !topicDb.isEmpty()
+                && !TopicConstant.DATABASE_DEFAULT_VALUE.equals(topicDb)
+                && !topicDb.equalsIgnoreCase(dbTableModel)) {
+              continue;
+            }
+          }
+
+          final String actualDbName = topicConfig.isTableTopic() ? 
dbTableModel : null;
+          final ConsensusLogToTabletConverter converter = 
buildConverter(topicConfig, actualDbName);
+
+          LOGGER.info(
+              "Auto-binding consensus queue for topic [{}] in group [{}] to 
new region {} (database={})",
+              topicName,
+              consumerGroupId,
+              groupId,
+              dbTableModel);
+
+          SubscriptionAgent.broker()
+              .bindConsensusPrefetchingQueue(
+                  consumerGroupId,
+                  topicName,
+                  groupId.toString(),
+                  serverImpl,
+                  converter,
+                  commitManager,
+                  startSearchIndex);
+        } catch (final Exception e) {
+          LOGGER.error(
+              "Failed to auto-bind topic [{}] in group [{}] to new region {}",
+              topicName,
+              consumerGroupId,
+              groupId,
+              e);
+        }
+      }
+    }
+  }
+
+  public static boolean isConsensusBasedTopic(final String topicName) {
+    try {
+      final String topicMode = 
SubscriptionAgent.topic().getTopicMode(topicName);
+      final String topicFormat = 
SubscriptionAgent.topic().getTopicFormat(topicName);
+      final boolean result =
+          TopicConstant.MODE_LIVE_VALUE.equalsIgnoreCase(topicMode)
+              && 
!TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase(topicFormat);
+      LOGGER.info(
+          "isConsensusBasedTopic check for topic [{}]: mode={}, format={}, 
result={}",
+          topicName,
+          topicMode,
+          topicFormat,
+          result);
+      return result;
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Failed to check if topic [{}] is consensus-based, defaulting to 
false", topicName, e);
+      return false;
+    }
+  }
+
+  public static void setupConsensusSubscriptions(
+      final String consumerGroupId, final Set<String> topicNames) {
+    final IConsensus dataRegionConsensus = 
DataRegionConsensusImpl.getInstance();
+    if (!(dataRegionConsensus instanceof IoTConsensus)) {
+      LOGGER.warn(
+          "Data region consensus is not IoTConsensus (actual: {}), "
+              + "cannot set up consensus-based subscription for consumer group 
[{}]",
+          dataRegionConsensus.getClass().getSimpleName(),
+          consumerGroupId);
+      return;
+    }
+
+    // Ensure the new-region listener is registered (idempotent)
+    ensureNewRegionListenerRegistered();
+
+    final IoTConsensus ioTConsensus = (IoTConsensus) dataRegionConsensus;
+    final ConsensusSubscriptionCommitManager commitManager =
+        ConsensusSubscriptionCommitManager.getInstance();
+
+    LOGGER.info(
+        "Setting up consensus subscriptions for consumer group [{}], 
topics={}, "
+            + "total consensus groups={}",
+        consumerGroupId,
+        topicNames,
+        ioTConsensus.getAllConsensusGroupIds().size());
+
+    for (final String topicName : topicNames) {
+      if (!isConsensusBasedTopic(topicName)) {
+        continue;
+      }
+
+      try {
+        setupConsensusQueueForTopic(consumerGroupId, topicName, ioTConsensus, 
commitManager);
+      } catch (final Exception e) {
+        LOGGER.error(
+            "Failed to set up consensus subscription for topic [{}] in 
consumer group [{}]",
+            topicName,
+            consumerGroupId,
+            e);
+      }
+    }
+  }
+
+  /**
+   * Set up consensus queue for a single topic. Discovers all local data 
region consensus groups and
+   * binds a ConsensusReqReader-based prefetching queue to every matching 
region.
+   *
+   * <p>For table-model topics, only regions whose database matches the 
topic's {@code DATABASE_KEY}
+   * filter are bound. For tree-model topics, all local data regions are 
bound. Additionally, the
+   * {@link #onNewRegionCreated} callback ensures that regions created after 
this method runs are
+   * also automatically bound.
+   */
+  private static void setupConsensusQueueForTopic(
+      final String consumerGroupId,
+      final String topicName,
+      final IoTConsensus ioTConsensus,
+      final ConsensusSubscriptionCommitManager commitManager) {
+
+    // Get topic config for building the converter
+    final Map<String, TopicConfig> topicConfigs =
+        
SubscriptionAgent.topic().getTopicConfigs(java.util.Collections.singleton(topicName));
+    final TopicConfig topicConfig = topicConfigs.get(topicName);
+    if (topicConfig == null) {
+      LOGGER.warn(
+          "Topic config not found for topic [{}], cannot set up consensus 
queue", topicName);
+      return;
+    }
+
+    // Build the converter based on topic config (path pattern, time range, 
tree/table model)
+    LOGGER.info(
+        "Setting up consensus queue for topic [{}]: isTableTopic={}, 
config={}",
+        topicName,
+        topicConfig.isTableTopic(),
+        topicConfig.getAttribute());
+
+    // For table topics, extract the database filter from topic config
+    final String topicDatabaseFilter =
+        topicConfig.isTableTopic()
+            ? topicConfig.getStringOrDefault(
+                TopicConstant.DATABASE_KEY, 
TopicConstant.DATABASE_DEFAULT_VALUE)
+            : null;
+
+    final List<ConsensusGroupId> allGroupIds = 
ioTConsensus.getAllConsensusGroupIds();
+    LOGGER.info(
+        "Discovered {} consensus group(s) for topic [{}] in consumer group 
[{}]: {}",
+        allGroupIds.size(),
+        topicName,
+        consumerGroupId,
+        allGroupIds);
+    boolean bound = false;
+
+    for (final ConsensusGroupId groupId : allGroupIds) {
+      if (!(groupId instanceof DataRegionId)) {
+        continue;
+      }
+
+      final IoTConsensusServerImpl serverImpl = ioTConsensus.getImpl(groupId);
+      if (serverImpl == null) {
+        continue;
+      }
+
+      // Resolve the DataRegion's actual database name
+      final DataRegion dataRegion =
+          StorageEngine.getInstance().getDataRegion((DataRegionId) groupId);
+      if (dataRegion == null) {
+        continue;
+      }
+      final String dbRaw = dataRegion.getDatabaseName();
+      final String dbTableModel = dbRaw.startsWith("root.") ? 
dbRaw.substring(5) : dbRaw;
+
+      if (topicDatabaseFilter != null
+          && !topicDatabaseFilter.isEmpty()
+          && !TopicConstant.DATABASE_DEFAULT_VALUE.equals(topicDatabaseFilter)
+          && !topicDatabaseFilter.equalsIgnoreCase(dbTableModel)) {
+        LOGGER.info(
+            "Skipping region {} (database={}) for table topic [{}] 
(DATABASE_KEY={})",
+            groupId,
+            dbTableModel,
+            topicName,
+            topicDatabaseFilter);
+        continue;
+      }
+
+      final String actualDbName = topicConfig.isTableTopic() ? dbTableModel : 
null;
+      final ConsensusLogToTabletConverter converter = 
buildConverter(topicConfig, actualDbName);
+
+      final long startSearchIndex = serverImpl.getSearchIndex() + 1;
+

Review Comment:
   `startSearchIndex` is always initialized as `serverImpl.getSearchIndex() + 
1`, which ignores any persisted subscription progress in 
`ConsensusSubscriptionCommitManager`. After a DataNode restart (or queue 
rebind), this will skip unconsumed WAL entries and break the at-least-once / 
recovery semantics described in the PR. The start index should be derived from 
the recovered committed search index when state exists (e.g., `max(committed+1, 
subscribeStart)` depending on desired semantics).
   ```suggestion
         long startSearchIndex = serverImpl.getSearchIndex() + 1;
   
         // If we have recovered a committed search index for this group, 
advance the
         // startSearchIndex to at least committed + 1 to honor at-least-once 
semantics.
         if (commitManager != null) {
           try {
             final long committedSearchIndex = 
commitManager.getCommittedSearchIndex(groupId);
             if (committedSearchIndex >= 0) {
               startSearchIndex = Math.max(startSearchIndex, 
committedSearchIndex + 1);
             }
           } catch (Exception e) {
             LOGGER.warn(
                 "Failed to load committed search index for consensus group {} 
when binding "
                     + "subscription prefetching queue, fallback to 
startSearchIndex={}",
                 groupId,
                 startSearchIndex,
                 e);
           }
         }
   ```



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java:
##########
@@ -30,7 +30,7 @@ public class SubscriptionConfig {
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
 
   public boolean getSubscriptionEnabled() {
-    return false;
+    return true; // TODO: make it configurable after subscription is stable

Review Comment:
   `getSubscriptionEnabled()` is now hard-coded to always return true, which 
changes the default behavior globally (multiple code paths gate subscription 
operations on this flag). This should remain configurable (or default to the 
previous disabled state) to avoid unexpectedly enabling an unstable feature in 
production deployments.
   ```suggestion
       return false; // TODO: make it configurable or enable by default after 
subscription is stable
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/ConsensusSubscriptionBroker.java:
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker;
+
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
+import 
org.apache.iotdb.db.subscription.broker.consensus.ConsensusLogToTabletConverter;
+import 
org.apache.iotdb.db.subscription.broker.consensus.ConsensusPrefetchingQueue;
+import 
org.apache.iotdb.db.subscription.broker.consensus.ConsensusSubscriptionCommitManager;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * Consensus-based subscription broker that reads data directly from 
IoTConsensus WAL. Each instance
+ * manages consensus prefetching queues for a single consumer group.
+ */
+public class ConsensusSubscriptionBroker implements ISubscriptionBroker {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsensusSubscriptionBroker.class);
+
+  private final String brokerId; // consumer group id
+
+  /** Maps topic name to a list of ConsensusPrefetchingQueues, one per data 
region. */
+  private final Map<String, List<ConsensusPrefetchingQueue>> 
topicNameToConsensusPrefetchingQueues;
+
+  /** Shared commit ID generators per topic. */
+  private final Map<String, AtomicLong> topicNameToCommitIdGenerator;
+
+  public ConsensusSubscriptionBroker(final String brokerId) {
+    this.brokerId = brokerId;
+    this.topicNameToConsensusPrefetchingQueues = new ConcurrentHashMap<>();
+    this.topicNameToCommitIdGenerator = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return topicNameToConsensusPrefetchingQueues.isEmpty();
+  }
+
+  @Override
+  public boolean hasQueue(final String topicName) {
+    final List<ConsensusPrefetchingQueue> queues =
+        topicNameToConsensusPrefetchingQueues.get(topicName);
+    return Objects.nonNull(queues)
+        && !queues.isEmpty()
+        && queues.stream().anyMatch(q -> !q.isClosed());
+  }
+
+  //////////////////////////// poll ////////////////////////////
+
+  @Override
+  public List<SubscriptionEvent> poll(
+      final String consumerId, final Set<String> topicNames, final long 
maxBytes) {
+    LOGGER.debug(
+        "ConsensusSubscriptionBroker [{}]: poll called, consumerId={}, 
topicNames={}, "
+            + "queueCount={}, maxBytes={}",
+        brokerId,
+        consumerId,
+        topicNames,
+        topicNameToConsensusPrefetchingQueues.size(),
+        maxBytes);
+
+    final List<SubscriptionEvent> eventsToPoll = new ArrayList<>();
+    final List<SubscriptionEvent> eventsToNack = new ArrayList<>();
+    long totalSize = 0;
+
+    for (final String topicName : topicNames) {
+      final List<ConsensusPrefetchingQueue> queues =
+          topicNameToConsensusPrefetchingQueues.get(topicName);
+      if (Objects.isNull(queues) || queues.isEmpty()) {
+        continue;
+      }
+
+      // Poll from all region queues for this topic
+      for (final ConsensusPrefetchingQueue consensusQueue : queues) {
+        if (consensusQueue.isClosed()) {
+          continue;
+        }
+
+        final SubscriptionEvent event = consensusQueue.poll(consumerId);
+        if (Objects.isNull(event)) {
+          continue;
+        }
+
+        final long currentSize;
+        try {
+          currentSize = event.getCurrentResponseSize();
+        } catch (final IOException e) {
+          eventsToNack.add(event);
+          continue;
+        }
+
+        eventsToPoll.add(event);
+        totalSize += currentSize;
+
+        if (totalSize + currentSize > maxBytes) {

Review Comment:
   The `maxBytes` limiting logic is incorrect: the event is added and 
`totalSize` is incremented, then the code checks `if (totalSize + currentSize > 
maxBytes)` which double-counts `currentSize` and can still return a batch 
exceeding `maxBytes`. Check the size *before* adding the event (or adjust the 
condition) and decide whether to stop/push back the event when it would exceed 
the limit.
   ```suggestion
           if (totalSize > maxBytes) {
   ```



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java:
##########
@@ -879,10 +972,25 @@ void checkAndUpdateSafeDeletedSearchIndex() {
     if (configuration.isEmpty()) {
       logger.error(
           "Configuration is empty, which is unexpected. Safe deleted search 
index won't be updated this time.");
-    } else if (configuration.size() == 1) {
+      return;
+    }
+
+    // Compute the minimum search index that subscription consumers still need.
+    // WAL entries at or after this index must be preserved.
+    long minSubscriptionIndex = Long.MAX_VALUE;
+    for (final LongSupplier supplier : subscriptionSyncIndexSuppliers) {
+      minSubscriptionIndex = Math.min(minSubscriptionIndex, 
supplier.getAsLong());
+    }
+
+    if (configuration.size() == 1 && subscriptionSyncIndexSuppliers.isEmpty()) 
{
+      // Single replica, no subscription consumers => delete all WAL freely
       consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
     } else {
-      consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
+      // min(replication progress, subscription progress) — preserve WAL for 
both
+      final long replicationIndex =
+          configuration.size() > 1 ? getMinFlushedSyncIndex() : Long.MAX_VALUE;
+      consensusReqReader.setSafelyDeletedSearchIndex(
+          Math.min(replicationIndex, minSubscriptionIndex));

Review Comment:
   `checkAndUpdateSafeDeletedSearchIndex()` now computes a subscription-aware 
safe delete index, but other code paths (e.g., `LogDispatcher` periodically 
calling `reader.setSafelyDeletedSearchIndex(impl.getMinFlushedSyncIndex())`) 
can overwrite it with a replication-only value. That can allow WAL deletion 
past the slowest subscription consumer, causing data loss. Consider 
centralizing *all* updates to safelyDeletedSearchIndex through a single 
subscription-aware method (or make `getMinFlushedSyncIndex()` incorporate 
subscription progress).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to