Caideyipi commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r2922136478
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java:
##########
@@ -662,6 +666,45 @@ private TPipeSubscribeResp
handlePipeSubscribeCloseInternal(final PipeSubscribeC
return
PipeSubscribeCloseResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
}
+ private TPipeSubscribeResp handlePipeSubscribeSeek(final
PipeSubscribeSeekReq req) {
+ try {
+ return handlePipeSubscribeSeekInternal(req);
+ } catch (final Exception e) {
+ LOGGER.warn("Exception occurred when seeking with request {}", req, e);
+ final String exceptionMessage =
+ String.format(
+ "Subscription: something unexpected happened when seeking with
request %s: %s",
+ req, e);
+ return PipeSubscribeSeekResp.toTPipeSubscribeResp(
+ RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_SEEK_ERROR,
exceptionMessage));
+ }
+ }
+
+ private TPipeSubscribeResp handlePipeSubscribeSeekInternal(final
PipeSubscribeSeekReq req) {
+ // check consumer config thread local
+ final ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
+ if (Objects.isNull(consumerConfig)) {
+ LOGGER.warn(
+ "Subscription: missing consumer config when handling
PipeSubscribeSeekReq: {}", req);
+ return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
Review Comment:
MISSING_CONSUMER_RESP?
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java:
##########
@@ -389,6 +389,13 @@ public class CommonConfig {
private long subscriptionMetaSyncerInitialSyncDelayMinutes = 3;
private long subscriptionMetaSyncerSyncIntervalMinutes = 3;
+ private int subscriptionConsensusBatchMaxDelayInMs = 50;
+ private long subscriptionConsensusBatchMaxSizeInBytes = 8 * MB;
+ private int subscriptionConsensusBatchMaxTabletCount = 64;
+ private int subscriptionConsensusBatchMaxWalEntries = 128;
+
+ private long subscriptionConsensusWalRetentionSizeInBytes = 512 * MB;
Review Comment:
May consider "throttleThreshold" as default value or directly use that?
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java:
##########
@@ -872,17 +955,41 @@ void checkAndUpdateIndex() {
}
/**
- * If there is only one replica, set it to Long.MAX_VALUE. If there are
multiple replicas, get the
- * latest SafelyDeletedSearchIndex again. This enables wal to be deleted in
a timely manner.
+ * Computes and updates the safe-to-delete WAL search index based on
replication progress and
+ * subscription WAL retention policy. When no subscriptions exist, WAL is
cleaned normally.
*/
- void checkAndUpdateSafeDeletedSearchIndex() {
+ public void checkAndUpdateSafeDeletedSearchIndex() {
if (configuration.isEmpty()) {
logger.error(
"Configuration is empty, which is unexpected. Safe deleted search
index won't be updated this time.");
- } else if (configuration.size() == 1) {
+ return;
+ }
+
+ final boolean hasSubscriptions = !subscriptionQueues.isEmpty();
+ final long retentionSizeLimit =
+ config.getReplication().getSubscriptionWalRetentionSizeInBytes();
+
+ if (configuration.size() == 1 && !hasSubscriptions) {
+ // Single replica, no subscription consumers => delete all WAL freely
consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
} else {
- consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
+ final long replicationIndex =
+ configuration.size() > 1 ? getMinFlushedSyncIndex() : Long.MAX_VALUE;
+
+ // Subscription WAL retention: if subscriptions exist and retention is
configured,
+ // prevent WAL deletion when total WAL size is within the retention
limit.
+ long subscriptionRetentionBound = Long.MAX_VALUE;
+ if (hasSubscriptions && retentionSizeLimit > 0) {
+ final long totalWalSize = consensusReqReader.getTotalSize();
+ if (totalWalSize <= retentionSizeLimit) {
+ // WAL size is within retention limit — preserve all WAL for
subscribers
+ subscriptionRetentionBound =
ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
Review Comment:
Seemingly a little bit coarse?
1. The wal clean is either "all" or "not", does not have a bound like "only
reserve the retention size amount of data"
2. The reserving logic does not consider the commit indexes of subscriptions
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java:
##########
@@ -30,7 +30,7 @@ public class SubscriptionConfig {
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
public boolean getSubscriptionEnabled() {
- return false;
+ return true; // TODO: make it configurable after subscription is stable
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]