This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new fbd86ec34f9 Subscription: fix nack tablet response & periodically 
report the state of prefetching queue & improve logs for subscription providers 
(#14822) (#14836)
fbd86ec34f9 is described below

commit fbd86ec34f99decd015aca1ef07f5a6dee3c5095
Author: VGalaxies <[email protected]>
AuthorDate: Mon Feb 17 09:59:31 2025 +0800

    Subscription: fix nack tablet response & periodically report the state of 
prefetching queue & improve logs for subscription providers (#14822) (#14836)
    
    * fix nack tablet response
    
    * improve log for consumer
    
    * report state for SubscriptionPrefetchingQueue
    
    * fix log
---
 .../consumer/SubscriptionConsumer.java             |  4 +-
 .../consumer/SubscriptionProviders.java            | 48 ++++++++++++++--------
 .../broker/SubscriptionPrefetchingQueue.java       | 11 +++++
 .../response/SubscriptionEventTabletResponse.java  |  5 ---
 4 files changed, 45 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 58dc7c7462d..e9349ba77fd 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -382,7 +382,9 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
       } catch (final Exception ignored) {
       }
       throw new SubscriptionConnectionException(
-          String.format("Failed to handshake with subscription provider %s", 
provider), e);
+          String.format(
+              "Failed to handshake with subscription provider %s because of 
%s", provider, e),
+          e);
     }
 
     // update consumer id and consumer group id if not exist
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
index 4fb93430bc8..3d09cc3516a 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
@@ -85,7 +85,8 @@ final class SubscriptionProviders {
       try {
         defaultProvider = consumer.constructProviderAndHandshake(endPoint);
       } catch (final Exception e) {
-        LOGGER.warn("Failed to create connection with {}", endPoint, e);
+        LOGGER.warn(
+            "{} failed to create connection with {} because of {}", consumer, 
endPoint, e, e);
         continue; // try next endpoint
       }
       defaultDataNodeId = defaultProvider.getDataNodeId();
@@ -95,8 +96,9 @@ final class SubscriptionProviders {
       try {
         allEndPoints = 
defaultProvider.getSessionConnection().fetchAllEndPoints();
       } catch (final Exception e) {
-        LOGGER.warn("Failed to fetch all endpoints from {}, will retry 
later...", endPoint, e);
-        break; // retry later
+        LOGGER.warn(
+            "{} failed to fetch all endpoints from {} because of {}", 
consumer, endPoint, e, e);
+        break;
       }
 
       for (final Map.Entry<Integer, TEndPoint> entry : 
allEndPoints.entrySet()) {
@@ -109,8 +111,12 @@ final class SubscriptionProviders {
           provider = consumer.constructProviderAndHandshake(entry.getValue());
         } catch (final Exception e) {
           LOGGER.warn(
-              "Failed to create connection with {}, will retry later...", 
entry.getValue(), e);
-          continue; // retry later
+              "{} failed to create connection with {} because of {}",
+              consumer,
+              entry.getValue(),
+              e,
+              e);
+          continue;
         }
         addProvider(entry.getKey(), provider);
       }
@@ -134,7 +140,7 @@ final class SubscriptionProviders {
       try {
         provider.close();
       } catch (final Exception e) {
-        LOGGER.warn(e.getMessage());
+        LOGGER.warn("Failed to close subscription provider {} because of {}", 
provider, e, e);
       }
     }
     subscriptionProviders.clear();
@@ -241,8 +247,10 @@ final class SubscriptionProviders {
         provider.setAvailable();
       } catch (final Exception e) {
         LOGGER.warn(
-            "something unexpected happened when sending heartbeat to 
subscription provider {}, set subscription provider unavailable",
+            "{} failed to sending heartbeat to subscription provider {} 
because of {}, set subscription provider unavailable",
+            consumer,
             provider,
+            e,
             e);
         provider.setUnavailable();
       }
@@ -269,7 +277,7 @@ final class SubscriptionProviders {
       try {
         openProviders(consumer);
       } catch (final Exception e) {
-        LOGGER.warn("something unexpected happened when syncing subscription 
endpoints...", e);
+        LOGGER.warn("Failed to open providers for consumer {} because of {}", 
consumer, e, e);
         return;
       }
     }
@@ -278,8 +286,8 @@ final class SubscriptionProviders {
     try {
       allEndPoints = consumer.fetchAllEndPointsWithRedirection();
     } catch (final Exception e) {
-      LOGGER.warn("Failed to fetch all endpoints, will retry later...", e);
-      return; // retry later
+      LOGGER.warn("Failed to fetch all endpoints for consumer {} because of 
{}", consumer, e, e);
+      return;
     }
 
     // add new providers or handshake existing providers
@@ -293,8 +301,8 @@ final class SubscriptionProviders {
           newProvider = consumer.constructProviderAndHandshake(endPoint);
         } catch (final Exception e) {
           LOGGER.warn(
-              "Failed to create connection with endpoint {}, will retry 
later...", endPoint, e);
-          continue; // retry later
+              "{} failed to create connection with {} because of {}", 
consumer, endPoint, e, e);
+          continue;
         }
         addProvider(entry.getKey(), newProvider);
       } else {
@@ -304,8 +312,10 @@ final class SubscriptionProviders {
           provider.setAvailable();
         } catch (final Exception e) {
           LOGGER.warn(
-              "something unexpected happened when sending heartbeat to 
subscription provider {}, set subscription provider unavailable",
+              "{} failed to sending heartbeat to subscription provider {} 
because of {}, set subscription provider unavailable",
+              consumer,
               provider,
+              e,
               e);
           provider.setUnavailable();
         }
@@ -315,8 +325,10 @@ final class SubscriptionProviders {
             closeAndRemoveProvider(entry.getKey());
           } catch (final Exception e) {
             LOGGER.warn(
-                "Exception occurred when closing and removing subscription 
provider with data node id {}",
-                entry.getKey(),
+                "Exception occurred when {} closing and removing subscription 
provider {} because of {}",
+                consumer,
+                provider,
+                e,
                 e);
           }
         }
@@ -331,8 +343,10 @@ final class SubscriptionProviders {
           closeAndRemoveProvider(dataNodeId);
         } catch (final Exception e) {
           LOGGER.warn(
-              "Exception occurred when closing and removing subscription 
provider with data node id {}",
-              dataNodeId,
+              "Exception occurred when {} closing and removing subscription 
provider {} because of {}",
+              consumer,
+              provider,
+              e,
               e);
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index cf78aff6d4a..ca9bf096efa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -97,6 +97,9 @@ public abstract class SubscriptionPrefetchingQueue {
 
   private final SubscriptionPrefetchingQueueStates states;
 
+  private static final long STATE_REPORT_INTERVAL_IN_MS = 10_000L;
+  private long lastStateReportTimestamp = System.currentTimeMillis();
+
   private volatile boolean isCompleted = false;
   private volatile boolean isClosed = false;
 
@@ -259,6 +262,7 @@ public abstract class SubscriptionPrefetchingQueue {
       if (isClosed()) {
         return false;
       }
+      reportStateIfNeeded();
       // TODO: more refined behavior (prefetch/serialize/...) control
       if (states.shouldPrefetch()) {
         tryPrefetch();
@@ -274,6 +278,13 @@ public abstract class SubscriptionPrefetchingQueue {
     }
   }
 
+  private void reportStateIfNeeded() {
+    if (System.currentTimeMillis() - lastStateReportTimestamp > 
STATE_REPORT_INTERVAL_IN_MS) {
+      LOGGER.info("Subscription: SubscriptionPrefetchingQueue state {}", this);
+      lastStateReportTimestamp = System.currentTimeMillis();
+    }
+  }
+
   @SafeVarargs
   private final void remapInFlightEventsSnapshot(
       final RemappingFunction<SubscriptionEvent>... functions) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
index cd93cd350ba..c6150536f21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
@@ -116,11 +116,6 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
 
   @Override
   public synchronized void nack() {
-    if (nextOffset.get() == 1) {
-      // do nothing if with complete tablets
-      return;
-    }
-
     cleanUp();
 
     // should not reset the iterator of batch when init

Reply via email to