Caideyipi commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r2923211250
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java:
##########
@@ -757,6 +805,41 @@ public long getSearchIndex() {
return searchIndex.get();
}
+ public ConsensusReqReader getConsensusReqReader() {
+ return consensusReqReader;
+ }
+
+ /**
+ * Registers a subscription pending queue for real-time in-memory data
delivery. When {@link
+ * #write(IConsensusRequest)} succeeds, the IndexedConsensusRequest is
offered to all registered
+ * subscription queues, enabling subscription consumers to receive data
without waiting for WAL
+ * flush.
+ *
+ * @param queue the blocking queue to receive IndexedConsensusRequest entries
+ */
+ public void registerSubscriptionQueue(final
BlockingQueue<IndexedConsensusRequest> queue) {
+ subscriptionQueues.add(queue);
+ // Immediately re-evaluate the safe delete index with new subscription
awareness
+ checkAndUpdateSafeDeletedSearchIndex();
+ logger.info(
+ "Registered subscription queue for group {}, "
+ + "total subscription queues: {}, currentSearchIndex={}, this={}",
+ consensusGroupId,
+ subscriptionQueues.size(),
+ searchIndex.get(),
+ System.identityHashCode(this));
+ }
+
+ public void unregisterSubscriptionQueue(final
BlockingQueue<IndexedConsensusRequest> queue) {
Review Comment:
deregister
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -200,41 +356,119 @@ public void bindPrefetchingQueue(final
SubscriptionSinkSubtask subtask) {
prefetchingQueueCount.invalidate();
}
- public void updateCompletedTopicNames(final String consumerGroupId, final
String topicName) {
- final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+ public void bindConsensusPrefetchingQueue(
+ final String consumerGroupId,
+ final String topicName,
+ final String consensusGroupId,
+ final IoTConsensusServerImpl serverImpl,
+ final ConsensusLogToTabletConverter converter,
+ final ConsensusSubscriptionCommitManager commitManager,
+ final long startSearchIndex) {
+ consumerGroupIdToConsensusBroker
+ .compute(
+ consumerGroupId,
+ (id, broker) -> {
+ if (Objects.isNull(broker)) {
+ LOGGER.info(
+ "Subscription: consensus broker bound to consumer group
[{}] does not exist, create new for binding consensus prefetching queue",
+ consumerGroupId);
+ return new ConsensusSubscriptionBroker(consumerGroupId);
+ }
+ return broker;
+ })
+ .bindConsensusPrefetchingQueue(
+ topicName, consensusGroupId, serverImpl, converter, commitManager,
startSearchIndex);
+ prefetchingQueueCount.invalidate();
+ }
+
+ public void unbindConsensusPrefetchingQueue(
+ final String consumerGroupId, final String topicName) {
+ final ConsensusSubscriptionBroker broker =
+ consumerGroupIdToConsensusBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn(
- "Subscription: broker bound to consumer group [{}] does not exist",
consumerGroupId);
+ "Subscription: consensus broker bound to consumer group [{}] does
not exist",
+ consumerGroupId);
+ return;
+ }
+ broker.unbindConsensusPrefetchingQueue(topicName);
+ prefetchingQueueCount.invalidate();
+ }
+
+ public void unbindByRegion(final String regionId) {
+ int totalClosed = 0;
+ for (final ConsensusSubscriptionBroker broker :
consumerGroupIdToConsensusBroker.values()) {
+ totalClosed += broker.unbindByRegion(regionId);
+ }
+ if (totalClosed > 0) {
+ prefetchingQueueCount.invalidate();
+ LOGGER.info(
+ "Subscription: unbound {} consensus prefetching queue(s) for removed
region [{}]",
+ totalClosed,
+ regionId);
+ }
+ }
+
+ public void updateCompletedTopicNames(final String consumerGroupId, final
String topicName) {
+ final SubscriptionBroker pipeBroker =
consumerGroupIdToPipeBroker.get(consumerGroupId);
+ if (Objects.isNull(pipeBroker)) {
+ LOGGER.warn(
+ "Subscription: pipe broker bound to consumer group [{}] does not
exist", consumerGroupId);
return;
}
- broker.updateCompletedTopicNames(topicName);
+ pipeBroker.updateCompletedTopicNames(topicName);
}
public void unbindPrefetchingQueue(final String consumerGroupId, final
String topicName) {
- final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
- if (Objects.isNull(broker)) {
+ // Try consensus broker first
+ final ConsensusSubscriptionBroker consensusBroker =
+ consumerGroupIdToConsensusBroker.get(consumerGroupId);
+ if (Objects.nonNull(consensusBroker) &&
consensusBroker.hasQueue(topicName)) {
+ consensusBroker.removeQueue(topicName);
Review Comment:
unbind?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]