Caideyipi commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r2922758960


##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java:
##########
@@ -1373,6 +1412,44 @@ private void unsubscribeWithRedirection(final 
Set<String> topicNames)
     throw new SubscriptionRuntimeCriticalException(errorMessage);
   }
 
+  /**
+   * Sends seek request to ALL available providers. Unlike 
subscribe/unsubscribe, seek must reach
+   * every node because data regions for the topic may be distributed across 
different nodes.
+   */
+  private void seekWithRedirection(
+      final String topicName, final short seekType, final long timestamp)
+      throws SubscriptionException {
+    final List<AbstractSubscriptionProvider> providers = 
this.providers.getAllAvailableProviders();
+    if (providers.isEmpty()) {
+      throw new SubscriptionConnectionException(
+          String.format(
+              "Cluster has no available subscription providers when %s seek 
topic %s",
+              this, topicName));
+    }
+    boolean anySuccess = false;
+    for (final AbstractSubscriptionProvider provider : providers) {
+      try {
+        provider.seek(topicName, seekType, timestamp);
+        anySuccess = true;
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "{} failed to seek topic {} from subscription provider {}, 
continuing with other providers...",
+            this,
+            topicName,
+            provider,
+            e);
+      }
+    }
+    if (!anySuccess) {

Review Comment:
   Shall we throw exception for "anySuccess" or "allSuccess" since "seek must 
reach every node"?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java:
##########
@@ -103,15 +107,41 @@ protected boolean executeFromValidate(final 
ConfigNodeProcedureEnv env)
     alterConsumerGroupProcedure =
         new AlterConsumerGroupProcedure(updatedConsumerGroupMeta, 
subscriptionInfo);
 
-    // Construct CreatePipeProcedureV2s
+    // Construct CreatePipeProcedureV2s (for non-consensus topics)
     for (final String topicName : subscribeReq.getTopicNames()) {
+      final TopicMeta topicMeta = 
subscriptionInfo.get().deepCopyTopicMeta(topicName);
+
+      // Check if this topic should use consensus subscription: mode is live, 
format is Tablet
+      final String topicMode =
+          topicMeta
+              .getConfig()
+              .getStringOrDefault(TopicConstant.MODE_KEY, 
TopicConstant.MODE_DEFAULT_VALUE);
+      final String topicFormat =
+          topicMeta
+              .getConfig()
+              .getStringOrDefault(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_DEFAULT_VALUE);
+      final boolean isConsensusBasedTopic =
+          TopicConstant.MODE_LIVE_VALUE.equalsIgnoreCase(topicMode)
+              && 
!TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase(topicFormat);

Review Comment:
   Shall MODE_LIVE + TS_FILE_HANDLER throw an exception here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to