This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 11bb7b78edc2e8980a78c37e57e1013781e9489c
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Jul 29 15:19:05 2024 +0800

    Subscription: support payload size control fallback strategy & fix issue 
where subscription events cannot be auto recycled & fix issue where the 
reference count of tablet events for tsfile topic cannot decrease to zero 
(#13053)
    
    (cherry picked from commit 376ed3c495797e118c3ebfac9b6abef14c65941d)
---
 .../consumer/SubscriptionPushConsumer.java         |  2 +-
 .../db/subscription/event/SubscriptionEvent.java   |  6 ++--
 .../batch/SubscriptionPipeTsFileEventBatch.java    |  7 ++++
 .../receiver/SubscriptionReceiverV1.java           | 41 +++++++++++++++++-----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 11 ++++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 +++
 .../subscription/config/SubscriptionConfig.java    |  5 +++
 7 files changed, 64 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 69cf13f9bef..e2dfe797457 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -184,7 +184,7 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
           final ConsumeResult consumeResult;
           try {
             consumeResult = consumeListener.onReceive(message);
-            if (consumeResult.equals(ConsumeResult.SUCCESS)) {
+            if (Objects.equals(consumeResult, ConsumeResult.SUCCESS)) {
               messagesToAck.add(message);
             } else {
               LOGGER.warn("Consumer listener result failure when consuming 
message: {}", message);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index aa68a522b22..bf0e409fdd4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -58,6 +58,7 @@ public class SubscriptionEvent {
   private final SubscriptionCommitContext
       commitContext; // all responses have the same commit context
 
+  // lastPolledConsumerId is not used as a criterion for determining 
pollability
   private String lastPolledConsumerId;
   private long lastPolledTimestamp;
   private long committedTimestamp;
@@ -163,9 +164,6 @@ public class SubscriptionEvent {
     if (lastPolledTimestamp == INVALID_TIMESTAMP) {
       return true;
     }
-    if (Objects.nonNull(lastPolledConsumerId)) {
-      return false;
-    }
     // Recycle events that may not be able to be committed, i.e., those that 
have been polled but
     // not committed within a certain period of time.
     return System.currentTimeMillis() - lastPolledTimestamp
@@ -176,7 +174,7 @@ public class SubscriptionEvent {
     // reset current response index
     currentResponseIndex = 0;
 
-    lastPolledConsumerId = null;
+    // reset lastPolledTimestamp makes this event pollable
     lastPolledTimestamp = INVALID_TIMESTAMP;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index c36f9ca3964..a307cfc9568 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.subscription.event.batch;
 
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
 import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -61,6 +62,12 @@ public class SubscriptionPipeTsFileEventBatch {
     }
     if (Objects.nonNull(event)) {
       batch.onEvent(event);
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) event)
+            .decreaseReferenceCount(
+                SubscriptionPipeTsFileEventBatch.class.getName(),
+                false); // missing releaseLastEvent decreases reference count
+      }
     }
     if (batch.shouldEmit()) {
       final List<SubscriptionEvent> events = generateSubscriptionEvents();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index c198609aa30..5ca6d5de56e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.consensus.ConfigRegionId;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
@@ -77,12 +78,18 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 public class SubscriptionReceiverV1 implements SubscriptionReceiver {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionReceiverV1.class);
 
+  private static final long POLL_PAYLOAD_MAX_SIZE =
+      SubscriptionConfig.getInstance().getSubscriptionPollPayloadMaxSize();
+
+  private static final double POLL_PAYLOAD_SIZE_THRESHOLD = 
POLL_PAYLOAD_MAX_SIZE * 0.75;
+
   private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
       ConfigNodeClientManager.getInstance();
 
@@ -358,23 +365,44 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
       }
 
       // generate response
+      final AtomicLong totalSize = new AtomicLong();
       return PipeSubscribePollResp.toTPipeSubscribeResp(
           RpcUtils.SUCCESS_STATUS,
-          events.parallelStream()
+          events.stream()
               .map(
                   (event) -> {
+                    final SubscriptionCommitContext commitContext = 
event.getCommitContext();
                     final SubscriptionPollResponse response = 
event.getCurrentResponse();
                     if (Objects.isNull(response)) {
-                      throw new SubscriptionException("null response");
+                      LOGGER.warn(
+                          "Subscription: consumer {} poll null response for 
event {} with request: {}",
+                          consumerConfig,
+                          event,
+                          req.getRequest());
+                      // nack
+                      SubscriptionAgent.broker()
+                          .commit(consumerConfig, 
Collections.singletonList(commitContext), true);
+                      return null;
                     }
-                    final SubscriptionCommitContext commitContext = 
response.getCommitContext();
+
                     try {
                       final ByteBuffer byteBuffer = 
event.getCurrentResponseByteBuffer();
+
+                      // payload size control
+                      final long size = byteBuffer.limit();
+                      if (totalSize.get() + size > 
POLL_PAYLOAD_SIZE_THRESHOLD) {
+                        throw new SubscriptionException(
+                            String.format(
+                                "payload size will exceed the threshold %s",
+                                POLL_PAYLOAD_SIZE_THRESHOLD));
+                      }
+                      totalSize.getAndAdd(size);
+
                       SubscriptionPrefetchingQueueMetrics.getInstance()
                           .mark(
                               
SubscriptionPrefetchingQueue.generatePrefetchingQueueId(
                                   commitContext.getConsumerGroupId(), 
commitContext.getTopicName()),
-                              byteBuffer.limit());
+                              size);
                       event.resetResponseByteBuffer(false);
                       LOGGER.info(
                           "Subscription: consumer {} poll {} successfully with 
request: {}",
@@ -391,10 +419,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
                           e);
                       // nack
                       SubscriptionAgent.broker()
-                          .commit(
-                              consumerConfig,
-                              
Collections.singletonList(response.getCommitContext()),
-                              true);
+                          .commit(consumerConfig, 
Collections.singletonList(commitContext), true);
                       return null;
                     }
                   })
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 45785f045b9..abcf0679dca 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -269,6 +269,9 @@ public class CommonConfig {
   private long subscriptionReadFileBufferSize = 8 * MB;
   private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
 
+  // default to SessionConfig.DEFAULT_MAX_FRAME_SIZE
+  private long subscriptionPollPayloadMaxSize = 64 * MB;
+
   /** Whether to use persistent schema mode. */
   private String schemaEngineMode = "Memory";
 
@@ -1231,6 +1234,14 @@ public class CommonConfig {
         subscriptionTsFileDeduplicationWindowSeconds;
   }
 
+  public long getSubscriptionPollPayloadMaxSize() {
+    return subscriptionPollPayloadMaxSize;
+  }
+
+  public void setSubscriptionPollPayloadMaxSize(long 
subscriptionPollPayloadMaxSize) {
+    this.subscriptionPollPayloadMaxSize = subscriptionPollPayloadMaxSize;
+  }
+
   public String getSchemaEngineMode() {
     return schemaEngineMode;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 31275dcaee9..864df3d75d7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -651,6 +651,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "subscription_ts_file_deduplication_window_seconds",
                 
String.valueOf(config.getSubscriptionTsFileDeduplicationWindowSeconds()))));
+    config.setSubscriptionPollPayloadMaxSize(
+        Long.parseLong(
+            properties.getProperty(
+                "subscription_poll_payload_max_size",
+                String.valueOf(config.getSubscriptionPollPayloadMaxSize()))));
   }
 
   public void loadRetryProperties(Properties properties) throws IOException {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index 0c73c6a23d7..7bac5e181c8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -75,6 +75,10 @@ public class SubscriptionConfig {
     return COMMON_CONFIG.getSubscriptionTsFileDeduplicationWindowSeconds();
   }
 
+  public long getSubscriptionPollPayloadMaxSize() {
+    return COMMON_CONFIG.getSubscriptionPollPayloadMaxSize();
+  }
+
   /////////////////////////////// Utils ///////////////////////////////
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionConfig.class);
@@ -106,6 +110,7 @@ public class SubscriptionConfig {
     LOGGER.info(
         "SubscriptionTsFileDeduplicationWindowSeconds: {}",
         getSubscriptionTsFileDeduplicationWindowSeconds());
+    LOGGER.info("SubscriptionPollPayloadMaxSize: {}", 
getSubscriptionPollPayloadMaxSize());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////

Reply via email to