This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new fbd86ec34f9 Subscription: fix nack tablet response & periodically
report the state of prefetching queue & improve logs for subscription providers
(#14822) (#14836)
fbd86ec34f9 is described below
commit fbd86ec34f99decd015aca1ef07f5a6dee3c5095
Author: VGalaxies <[email protected]>
AuthorDate: Mon Feb 17 09:59:31 2025 +0800
Subscription: fix nack tablet response & periodically report the state of
prefetching queue & improve logs for subscription providers (#14822) (#14836)
* fix nack tablet response
* improve log for consumer
* report state for SubscriptionPrefetchingQueue
* fix log
---
.../consumer/SubscriptionConsumer.java | 4 +-
.../consumer/SubscriptionProviders.java | 48 ++++++++++++++--------
.../broker/SubscriptionPrefetchingQueue.java | 11 +++++
.../response/SubscriptionEventTabletResponse.java | 5 ---
4 files changed, 45 insertions(+), 23 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 58dc7c7462d..e9349ba77fd 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -382,7 +382,9 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
} catch (final Exception ignored) {
}
throw new SubscriptionConnectionException(
- String.format("Failed to handshake with subscription provider %s",
provider), e);
+ String.format(
+ "Failed to handshake with subscription provider %s because of
%s", provider, e),
+ e);
}
// update consumer id and consumer group id if not exist
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
index 4fb93430bc8..3d09cc3516a 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
@@ -85,7 +85,8 @@ final class SubscriptionProviders {
try {
defaultProvider = consumer.constructProviderAndHandshake(endPoint);
} catch (final Exception e) {
- LOGGER.warn("Failed to create connection with {}", endPoint, e);
+ LOGGER.warn(
+ "{} failed to create connection with {} because of {}", consumer,
endPoint, e, e);
continue; // try next endpoint
}
defaultDataNodeId = defaultProvider.getDataNodeId();
@@ -95,8 +96,9 @@ final class SubscriptionProviders {
try {
allEndPoints =
defaultProvider.getSessionConnection().fetchAllEndPoints();
} catch (final Exception e) {
- LOGGER.warn("Failed to fetch all endpoints from {}, will retry
later...", endPoint, e);
- break; // retry later
+ LOGGER.warn(
+ "{} failed to fetch all endpoints from {} because of {}",
consumer, endPoint, e, e);
+ break;
}
for (final Map.Entry<Integer, TEndPoint> entry :
allEndPoints.entrySet()) {
@@ -109,8 +111,12 @@ final class SubscriptionProviders {
provider = consumer.constructProviderAndHandshake(entry.getValue());
} catch (final Exception e) {
LOGGER.warn(
- "Failed to create connection with {}, will retry later...",
entry.getValue(), e);
- continue; // retry later
+ "{} failed to create connection with {} because of {}",
+ consumer,
+ entry.getValue(),
+ e,
+ e);
+ continue;
}
addProvider(entry.getKey(), provider);
}
@@ -134,7 +140,7 @@ final class SubscriptionProviders {
try {
provider.close();
} catch (final Exception e) {
- LOGGER.warn(e.getMessage());
+ LOGGER.warn("Failed to close subscription provider {} because of {}",
provider, e, e);
}
}
subscriptionProviders.clear();
@@ -241,8 +247,10 @@ final class SubscriptionProviders {
provider.setAvailable();
} catch (final Exception e) {
LOGGER.warn(
- "something unexpected happened when sending heartbeat to
subscription provider {}, set subscription provider unavailable",
+ "{} failed to sending heartbeat to subscription provider {}
because of {}, set subscription provider unavailable",
+ consumer,
provider,
+ e,
e);
provider.setUnavailable();
}
@@ -269,7 +277,7 @@ final class SubscriptionProviders {
try {
openProviders(consumer);
} catch (final Exception e) {
- LOGGER.warn("something unexpected happened when syncing subscription
endpoints...", e);
+ LOGGER.warn("Failed to open providers for consumer {} because of {}",
consumer, e, e);
return;
}
}
@@ -278,8 +286,8 @@ final class SubscriptionProviders {
try {
allEndPoints = consumer.fetchAllEndPointsWithRedirection();
} catch (final Exception e) {
- LOGGER.warn("Failed to fetch all endpoints, will retry later...", e);
- return; // retry later
+ LOGGER.warn("Failed to fetch all endpoints for consumer {} because of
{}", consumer, e, e);
+ return;
}
// add new providers or handshake existing providers
@@ -293,8 +301,8 @@ final class SubscriptionProviders {
newProvider = consumer.constructProviderAndHandshake(endPoint);
} catch (final Exception e) {
LOGGER.warn(
- "Failed to create connection with endpoint {}, will retry
later...", endPoint, e);
- continue; // retry later
+ "{} failed to create connection with {} because of {}",
consumer, endPoint, e, e);
+ continue;
}
addProvider(entry.getKey(), newProvider);
} else {
@@ -304,8 +312,10 @@ final class SubscriptionProviders {
provider.setAvailable();
} catch (final Exception e) {
LOGGER.warn(
- "something unexpected happened when sending heartbeat to
subscription provider {}, set subscription provider unavailable",
+ "{} failed to sending heartbeat to subscription provider {}
because of {}, set subscription provider unavailable",
+ consumer,
provider,
+ e,
e);
provider.setUnavailable();
}
@@ -315,8 +325,10 @@ final class SubscriptionProviders {
closeAndRemoveProvider(entry.getKey());
} catch (final Exception e) {
LOGGER.warn(
- "Exception occurred when closing and removing subscription
provider with data node id {}",
- entry.getKey(),
+ "Exception occurred when {} closing and removing subscription
provider {} because of {}",
+ consumer,
+ provider,
+ e,
e);
}
}
@@ -331,8 +343,10 @@ final class SubscriptionProviders {
closeAndRemoveProvider(dataNodeId);
} catch (final Exception e) {
LOGGER.warn(
- "Exception occurred when closing and removing subscription
provider with data node id {}",
- dataNodeId,
+ "Exception occurred when {} closing and removing subscription
provider {} because of {}",
+ consumer,
+ provider,
+ e,
e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index cf78aff6d4a..ca9bf096efa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -97,6 +97,9 @@ public abstract class SubscriptionPrefetchingQueue {
private final SubscriptionPrefetchingQueueStates states;
+ private static final long STATE_REPORT_INTERVAL_IN_MS = 10_000L;
+ private long lastStateReportTimestamp = System.currentTimeMillis();
+
private volatile boolean isCompleted = false;
private volatile boolean isClosed = false;
@@ -259,6 +262,7 @@ public abstract class SubscriptionPrefetchingQueue {
if (isClosed()) {
return false;
}
+ reportStateIfNeeded();
// TODO: more refined behavior (prefetch/serialize/...) control
if (states.shouldPrefetch()) {
tryPrefetch();
@@ -274,6 +278,13 @@ public abstract class SubscriptionPrefetchingQueue {
}
}
+ private void reportStateIfNeeded() {
+ if (System.currentTimeMillis() - lastStateReportTimestamp >
STATE_REPORT_INTERVAL_IN_MS) {
+ LOGGER.info("Subscription: SubscriptionPrefetchingQueue state {}", this);
+ lastStateReportTimestamp = System.currentTimeMillis();
+ }
+ }
+
@SafeVarargs
private final void remapInFlightEventsSnapshot(
final RemappingFunction<SubscriptionEvent>... functions) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
index cd93cd350ba..c6150536f21 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
@@ -116,11 +116,6 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
@Override
public synchronized void nack() {
- if (nextOffset.get() == 1) {
- // do nothing if with complete tablets
- return;
- }
-
cleanUp();
// should not reset the iterator of batch when init