This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 11bb7b78edc2e8980a78c37e57e1013781e9489c Author: V_Galaxy <[email protected]> AuthorDate: Mon Jul 29 15:19:05 2024 +0800 Subscription: support payload size control fallback strategy & fix issue where subscription events cannot be auto recycled & fix issue where the reference count of tablet events for tsfile topic cannot decrease to zero (#13053) (cherry picked from commit 376ed3c495797e118c3ebfac9b6abef14c65941d) --- .../consumer/SubscriptionPushConsumer.java | 2 +- .../db/subscription/event/SubscriptionEvent.java | 6 ++-- .../batch/SubscriptionPipeTsFileEventBatch.java | 7 ++++ .../receiver/SubscriptionReceiverV1.java | 41 +++++++++++++++++----- .../apache/iotdb/commons/conf/CommonConfig.java | 11 ++++++ .../iotdb/commons/conf/CommonDescriptor.java | 5 +++ .../subscription/config/SubscriptionConfig.java | 5 +++ 7 files changed, 64 insertions(+), 13 deletions(-) diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java index 69cf13f9bef..e2dfe797457 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java @@ -184,7 +184,7 @@ public class SubscriptionPushConsumer extends SubscriptionConsumer { final ConsumeResult consumeResult; try { consumeResult = consumeListener.onReceive(message); - if (consumeResult.equals(ConsumeResult.SUCCESS)) { + if (Objects.equals(consumeResult, ConsumeResult.SUCCESS)) { messagesToAck.add(message); } else { LOGGER.warn("Consumer listener result failure when consuming message: {}", message); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index aa68a522b22..bf0e409fdd4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -58,6 +58,7 @@ public class SubscriptionEvent { private final SubscriptionCommitContext commitContext; // all responses have the same commit context + // lastPolledConsumerId is not used as a criterion for determining pollability private String lastPolledConsumerId; private long lastPolledTimestamp; private long committedTimestamp; @@ -163,9 +164,6 @@ public class SubscriptionEvent { if (lastPolledTimestamp == INVALID_TIMESTAMP) { return true; } - if (Objects.nonNull(lastPolledConsumerId)) { - return false; - } // Recycle events that may not be able to be committed, i.e., those that have been polled but // not committed within a certain period of time. return System.currentTimeMillis() - lastPolledTimestamp @@ -176,7 +174,7 @@ public class SubscriptionEvent { // reset current response index currentResponseIndex = 0; - lastPolledConsumerId = null; + // reset lastPolledTimestamp makes this event pollable lastPolledTimestamp = INVALID_TIMESTAMP; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index c36f9ca3964..a307cfc9568 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.subscription.event.batch; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch; import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; @@ -61,6 +62,12 @@ public class SubscriptionPipeTsFileEventBatch { } if (Objects.nonNull(event)) { batch.onEvent(event); + if (event instanceof EnrichedEvent) { + ((EnrichedEvent) event) + .decreaseReferenceCount( + SubscriptionPipeTsFileEventBatch.class.getName(), + false); // missing releaseLastEvent decreases reference count + } } if (batch.shouldEmit()) { final List<SubscriptionEvent> events = generateSubscriptionEvents(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index c198609aa30..5ca6d5de56e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.consensus.ConfigRegionId; +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq; @@ -77,12 +78,18 @@ import java.util.List; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; public class SubscriptionReceiverV1 implements SubscriptionReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionReceiverV1.class); + private static final long POLL_PAYLOAD_MAX_SIZE = + SubscriptionConfig.getInstance().getSubscriptionPollPayloadMaxSize(); + + private static final double POLL_PAYLOAD_SIZE_THRESHOLD = POLL_PAYLOAD_MAX_SIZE * 0.75; + private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance(); @@ -358,23 +365,44 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { } // generate response + final AtomicLong totalSize = new AtomicLong(); return PipeSubscribePollResp.toTPipeSubscribeResp( RpcUtils.SUCCESS_STATUS, - events.parallelStream() + events.stream() .map( (event) -> { + final SubscriptionCommitContext commitContext = event.getCommitContext(); final SubscriptionPollResponse response = event.getCurrentResponse(); if (Objects.isNull(response)) { - throw new SubscriptionException("null response"); + LOGGER.warn( + "Subscription: consumer {} poll null response for event {} with request: {}", + consumerConfig, + event, + req.getRequest()); + // nack + SubscriptionAgent.broker() + .commit(consumerConfig, Collections.singletonList(commitContext), true); + return null; } - final SubscriptionCommitContext commitContext = response.getCommitContext(); + try { final ByteBuffer byteBuffer = event.getCurrentResponseByteBuffer(); + + // payload size control + final long size = byteBuffer.limit(); + if (totalSize.get() + size > POLL_PAYLOAD_SIZE_THRESHOLD) { + throw new SubscriptionException( + String.format( + "payload size will exceed the threshold %s", + POLL_PAYLOAD_SIZE_THRESHOLD)); + } + totalSize.getAndAdd(size); + SubscriptionPrefetchingQueueMetrics.getInstance() .mark( SubscriptionPrefetchingQueue.generatePrefetchingQueueId( commitContext.getConsumerGroupId(), commitContext.getTopicName()), - byteBuffer.limit()); + size); event.resetResponseByteBuffer(false); LOGGER.info( "Subscription: consumer {} poll {} successfully with request: {}", @@ -391,10 +419,7 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { e); // nack SubscriptionAgent.broker() - .commit( - consumerConfig, - Collections.singletonList(response.getCommitContext()), - true); + .commit(consumerConfig, Collections.singletonList(commitContext), true); return null; } }) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 45785f045b9..abcf0679dca 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -269,6 +269,9 @@ public class CommonConfig { private long subscriptionReadFileBufferSize = 8 * MB; private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s + // default to SessionConfig.DEFAULT_MAX_FRAME_SIZE + private long subscriptionPollPayloadMaxSize = 64 * MB; + /** Whether to use persistent schema mode. */ private String schemaEngineMode = "Memory"; @@ -1231,6 +1234,14 @@ public class CommonConfig { subscriptionTsFileDeduplicationWindowSeconds; } + public long getSubscriptionPollPayloadMaxSize() { + return subscriptionPollPayloadMaxSize; + } + + public void setSubscriptionPollPayloadMaxSize(long subscriptionPollPayloadMaxSize) { + this.subscriptionPollPayloadMaxSize = subscriptionPollPayloadMaxSize; + } + public String getSchemaEngineMode() { return schemaEngineMode; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 31275dcaee9..864df3d75d7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -651,6 +651,11 @@ public class CommonDescriptor { properties.getProperty( "subscription_ts_file_deduplication_window_seconds", String.valueOf(config.getSubscriptionTsFileDeduplicationWindowSeconds())))); + config.setSubscriptionPollPayloadMaxSize( + Long.parseLong( + properties.getProperty( + "subscription_poll_payload_max_size", + String.valueOf(config.getSubscriptionPollPayloadMaxSize())))); } public void loadRetryProperties(Properties properties) throws IOException { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java index 0c73c6a23d7..7bac5e181c8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java @@ -75,6 +75,10 @@ public class SubscriptionConfig { return COMMON_CONFIG.getSubscriptionTsFileDeduplicationWindowSeconds(); } + public long getSubscriptionPollPayloadMaxSize() { + return COMMON_CONFIG.getSubscriptionPollPayloadMaxSize(); + } + /////////////////////////////// Utils /////////////////////////////// private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConfig.class); @@ -106,6 +110,7 @@ public class SubscriptionConfig { LOGGER.info( "SubscriptionTsFileDeduplicationWindowSeconds: {}", getSubscriptionTsFileDeduplicationWindowSeconds()); + LOGGER.info("SubscriptionPollPayloadMaxSize: {}", getSubscriptionPollPayloadMaxSize()); } /////////////////////////////// Singleton ///////////////////////////////
