This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.4
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2937a005a0b288914f5e078c99841e0c77979bba
Author: VGalaxies <[email protected]>
AuthorDate: Wed May 7 11:26:22 2025 +0800

    Subscription: detect outdated subscription event for tsfile message (#15430)
    
    (cherry picked from commit 9997004babb4bba11ef0e6b0ddd38c287a3800d9)
---
 .../base/AbstractSubscriptionConsumer.java         | 22 ++++++++++------------
 1 file changed, 10 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index 83ff755ddfb..88fab21ef4f 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -723,7 +723,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
     final Path filePath = getFilePath(commitContext, topicName, fileName, 
true, true);
     final File file = filePath.toFile();
     try (final RandomAccessFile fileWriter = new RandomAccessFile(file, "rw")) 
{
-      return Optional.of(pollFileInternal(commitContext, fileName, file, 
fileWriter, timer));
+      return pollFileInternal(commitContext, fileName, file, fileWriter, 
timer);
     } catch (final Exception e) {
       if (!(e instanceof SubscriptionPollTimeoutException)) {
         inFlightFilesCommitContextSet.remove(commitContext);
@@ -736,7 +736,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
     }
   }
 
-  private SubscriptionMessage pollFileInternal(
+  private Optional<SubscriptionMessage> pollFileInternal(
       final SubscriptionCommitContext commitContext,
       final String rawFileName,
       final File file,
@@ -769,13 +769,10 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
       final List<SubscriptionPollResponse> responses =
           pollFileInternal(commitContext, writingOffset, timer.remainingMs());
 
-      // It's agreed that the server will always return at least one response, 
even in case of
-      // failure.
+      // If responses is empty, it means that some outdated subscription 
events may be being polled,
+      // so just return.
       if (responses.isEmpty()) {
-        final String errorMessage =
-            String.format("SubscriptionConsumer %s poll empty response", this);
-        LOGGER.warn(errorMessage);
-        throw new SubscriptionRuntimeNonCriticalException(errorMessage);
+        return Optional.empty();
       }
 
       // only one SubscriptionEvent polled currently
@@ -884,10 +881,11 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
 
             // generate subscription message
             inFlightFilesCommitContextSet.remove(commitContext);
-            return new SubscriptionMessage(
-                commitContext,
-                file.getAbsolutePath(),
-                ((FileSealPayload) payload).getDatabaseName());
+            return Optional.of(
+                new SubscriptionMessage(
+                    commitContext,
+                    file.getAbsolutePath(),
+                    ((FileSealPayload) payload).getDatabaseName()));
           }
         case ERROR:
           {

Reply via email to