This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 67789544e8a IGNITE-28293 Introduce registry for replica request 
handlers (#7822)
67789544e8a is described below

commit 67789544e8aa7103aed264ded11a3985da6e5996
Author: Ivan Zlenko <[email protected]>
AuthorDate: Mon Mar 23 12:46:12 2026 +0500

    IGNITE-28293 Introduce registry for replica request handlers (#7822)
---
 ...ilablePartitionsRecoveryByFilterUpdateTest.java |   2 +
 .../replicator/PartitionReplicaListener.java       |  63 +++++----
 .../handlers/BuildIndexReplicaRequestHandler.java  |  11 +-
 .../handlers/ReadOnlyReplicaRequestHandler.java    |  39 +++++
 .../replicator/handlers/ReplicaRequestHandler.java |  39 +++++
 .../handlers/ReplicaRequestHandlers.java           | 157 +++++++++++++++++++++
 .../handlers/ScanCloseRequestHandler.java          |  70 +++++++++
 .../handlers/ReplicaRequestHandlersTest.java       | 114 +++++++++++++++
 8 files changed, 460 insertions(+), 35 deletions(-)

diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
index 438e9dbaacc..773f5610b67 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
@@ -254,6 +254,7 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
      * @throws Exception If failed.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316";)
     void testNodesWaitForLastNodeFromChainToComeBackOnlineAfterMajorityStops() 
throws Exception {
         for (int i = 1; i < 8; i++) {
             startNode(i, CUSTOM_NODES_CONFIG);
@@ -315,6 +316,7 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
      * @throws Exception If failed.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316";)
     void 
testNodesWaitForNodesFromGracefulChainToComeBackOnlineAfterMajorityStops() 
throws Exception {
         for (int i = 1; i < 8; i++) {
             startNode(i, CUSTOM_NODES_CONFIG);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index c817d48540d..9bf053157e9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -114,6 +114,7 @@ import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApp
 import 
org.apache.ignite.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
 import 
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
 import 
org.apache.ignite.internal.partition.replicator.exception.OperationLockException;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
 import 
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
@@ -123,7 +124,6 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCom
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2Builder;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
-import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
@@ -139,7 +139,6 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadW
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSwapRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
-import 
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import 
org.apache.ignite.internal.partition.replicator.schemacompat.IncompatibleSchemaVersionException;
 import 
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
@@ -184,6 +183,10 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.TableUtils;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
+import 
org.apache.ignite.internal.table.distributed.replicator.handlers.ReadOnlyReplicaRequestHandler;
+import 
org.apache.ignite.internal.table.distributed.replicator.handlers.ReplicaRequestHandler;
+import 
org.apache.ignite.internal.table.distributed.replicator.handlers.ReplicaRequestHandlers;
+import 
org.apache.ignite.internal.table.distributed.replicator.handlers.ScanCloseRequestHandler;
 import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
 import org.apache.ignite.internal.tx.DelayedAckException;
 import org.apache.ignite.internal.tx.Lock;
@@ -335,8 +338,8 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
     private final ReliableCatalogVersions reliableCatalogVersions;
     private final ReplicationRaftCommandApplicator raftCommandApplicator;
 
-    // Replica request handlers.
-    private final BuildIndexReplicaRequestHandler 
buildIndexReplicaRequestHandler;
+    /** Registry of replica request handlers. */
+    private final ReplicaRequestHandlers requestHandlers;
 
     /**
      * The constructor.
@@ -428,7 +431,19 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
         raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
 
-        buildIndexReplicaRequestHandler = new 
BuildIndexReplicaRequestHandler(indexMetaStorage, raftCommandApplicator);
+        ReplicaRequestHandlers.Builder handlersBuilder = new 
ReplicaRequestHandlers.Builder();
+
+        handlersBuilder.addHandler(
+                PartitionReplicationMessageGroup.GROUP_TYPE,
+                PartitionReplicationMessageGroup.SCAN_CLOSE_REPLICA_REQUEST,
+                new ScanCloseRequestHandler(remotelyTriggeredResourceRegistry, 
replicationGroupId));
+
+        handlersBuilder.addHandler(
+                PartitionReplicationMessageGroup.GROUP_TYPE,
+                PartitionReplicationMessageGroup.BUILD_INDEX_REPLICA_REQUEST,
+                new BuildIndexReplicaRequestHandler(indexMetaStorage, 
raftCommandApplicator));
+
+        requestHandlers = handlersBuilder.build();
     }
 
     @Override
@@ -511,6 +526,21 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             ReplicaPrimacy replicaPrimacy,
             @Nullable HybridTimestamp opStartTsIfDirectRo
     ) {
+        ReplicaRequestHandler<ReplicaRequest> handler = 
requestHandlers.handler(request.groupType(), request.messageType());
+
+        if (handler != null) {
+            return handler.handle(request, replicaPrimacy);
+        }
+
+        ReadOnlyReplicaRequestHandler<ReplicaRequest> roHandler =
+                requestHandlers.roHandler(request.groupType(), 
request.messageType());
+
+        if (roHandler != null) {
+            assert opStartTsIfDirectRo != null;
+
+            return roHandler.handle(request, opStartTsIfDirectRo);
+        }
+
         if (request instanceof ReadWriteSingleRowReplicaRequest) {
             var req = (ReadWriteSingleRowReplicaRequest) request;
 
@@ -590,10 +620,6 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                             releaseTxLocks(req.transactionId());
                         }
                     });
-        } else if (request instanceof ScanCloseReplicaRequest) {
-            processScanCloseAction((ScanCloseReplicaRequest) request);
-
-            return nullCompletedFuture();
         } else if (request instanceof TableWriteIntentSwitchReplicaRequest) {
             return 
processTableWriteIntentSwitchAction((TableWriteIntentSwitchReplicaRequest) 
request);
         } else if (request instanceof ReadOnlySingleRowPkReplicaRequest) {
@@ -602,8 +628,6 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest) request, 
replicaPrimacy.isPrimary());
         } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) 
{
             return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request, replicaPrimacy.isPrimary());
-        } else if (request instanceof BuildIndexReplicaRequest) {
-            return 
buildIndexReplicaRequestHandler.handle((BuildIndexReplicaRequest) request);
         } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
             return 
processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) 
request, opStartTsIfDirectRo);
         } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
@@ -920,23 +944,6 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         });
     }
 
-    /**
-     * Processes scan close request.
-     *
-     * @param request Scan close request operation.
-     */
-    private void processScanCloseAction(ScanCloseReplicaRequest request) {
-        UUID txId = request.transactionId();
-
-        FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());
-
-        try {
-            remotelyTriggeredResourceRegistry.close(cursorId);
-        } catch (IgniteException e) {
-            throw wrapCursorCloseException(e);
-        }
-    }
-
     /**
      * Closes a cursor if the batch is not fully retrieved.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
index 7d37a4922f1..3ab29fcea2e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
@@ -21,6 +21,7 @@ import static 
org.apache.ignite.internal.partition.replicator.index.MetaIndexSta
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
 import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
 import org.apache.ignite.internal.partition.replicator.index.IndexMeta;
 import 
org.apache.ignite.internal.partition.replicator.index.MetaIndexStatusChange;
@@ -32,7 +33,7 @@ import 
org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 /**
  * Handler for {@link BuildIndexReplicaRequest}.
  */
-public class BuildIndexReplicaRequestHandler {
+public class BuildIndexReplicaRequestHandler implements 
ReplicaRequestHandler<BuildIndexReplicaRequest> {
     /** Factory to create RAFT command messages. */
     private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
             new PartitionReplicationMessagesFactory();
@@ -53,12 +54,8 @@ public class BuildIndexReplicaRequestHandler {
         this.commandApplicator = commandApplicator;
     }
 
-    /**
-     * Handles {@link BuildIndexReplicaRequest}.
-     *
-     * @param request Request to handle.
-     */
-    public CompletableFuture<?> handle(BuildIndexReplicaRequest request) {
+    @Override
+    public CompletableFuture<?> handle(BuildIndexReplicaRequest request, 
ReplicaPrimacy replicaPrimacy) {
         IndexMeta indexMeta = indexMetaStorage.indexMeta(request.indexId());
 
         if (indexMeta == null || indexMeta.isDropped()) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReadOnlyReplicaRequestHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReadOnlyReplicaRequestHandler.java
new file mode 100644
index 00000000000..d6cb42b6991
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReadOnlyReplicaRequestHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+
+/**
+ * Handler for read-only replica requests that require an operation start 
timestamp.
+ *
+ * @param <T> Type of the request this handler processes.
+ */
+@FunctionalInterface
+public interface ReadOnlyReplicaRequestHandler<T extends ReplicaRequest> {
+    /**
+     * Handles the given read-only request.
+     *
+     * @param request Request to handle.
+     * @param opStartTs Operation start timestamp.
+     * @return Future with the result.
+     */
+    CompletableFuture<?> handle(T request, HybridTimestamp opStartTs);
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandler.java
new file mode 100644
index 00000000000..53ce8104b39
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+
+/**
+ * Handler for a specific type of {@link ReplicaRequest}.
+ *
+ * @param <T> Type of the request this handler processes.
+ */
+@FunctionalInterface
+public interface ReplicaRequestHandler<T extends ReplicaRequest> {
+    /**
+     * Handles the given request.
+     *
+     * @param request Request to handle.
+     * @param replicaPrimacy Replica primacy information.
+     * @return Future with the result.
+     */
+    CompletableFuture<?> handle(T request, ReplicaPrimacy replicaPrimacy);
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlers.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlers.java
new file mode 100644
index 00000000000..6017848b745
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlers.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Registry of replica request handlers keyed by message group and message 
type.
+ *
+ * <p>Contains two separate registries: one for {@link ReplicaRequestHandler} 
and one for {@link ReadOnlyReplicaRequestHandler}.
+ */
+public class ReplicaRequestHandlers {
+    private final Map<MessageId, ReplicaRequestHandler<?>> handlers;
+
+    private final Map<MessageId, ReadOnlyReplicaRequestHandler<?>> roHandlers;
+
+    private ReplicaRequestHandlers(
+            Map<MessageId, ReplicaRequestHandler<?>> handlers,
+            Map<MessageId, ReadOnlyReplicaRequestHandler<?>> roHandlers
+    ) {
+        this.handlers = handlers;
+        this.roHandlers = roHandlers;
+    }
+
+    /**
+     * Returns a handler for the given message group and type, or {@code null} 
if no handler is registered.
+     *
+     * @param messageGroup Message group identifier.
+     * @param messageType Message type identifier.
+     * @return Handler, or {@code null} if not found.
+     */
+    public @Nullable ReplicaRequestHandler<ReplicaRequest> handler(short 
messageGroup, short messageType) {
+        return (ReplicaRequestHandler<ReplicaRequest>) handlers.get(new 
MessageId(messageGroup, messageType));
+    }
+
+    /**
+     * Returns a read-only handler for the given message group and type, or 
{@code null} if no handler is registered.
+     *
+     * @param messageGroup Message group identifier.
+     * @param messageType Message type identifier.
+     * @return Handler, or {@code null} if not found.
+     */
+    public @Nullable ReadOnlyReplicaRequestHandler<ReplicaRequest> roHandler(
+            short messageGroup,
+            short messageType
+    ) {
+        return (ReadOnlyReplicaRequestHandler<ReplicaRequest>) 
roHandlers.get(new MessageId(messageGroup, messageType));
+    }
+
+    private static final class MessageId {
+        final short messageGroup;
+        final short messageType;
+
+        MessageId(short messageGroup, short messageType) {
+            this.messageGroup = messageGroup;
+            this.messageType = messageType;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            MessageId messageId = (MessageId) o;
+            return messageGroup == messageId.messageGroup && messageType == 
messageId.messageType;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = 1;
+
+            result = 31 * result + messageGroup;
+            result = 31 * result + messageType;
+
+            return result;
+        }
+    }
+
+    /**
+     * Builder for {@link ReplicaRequestHandlers}.
+     */
+    public static class Builder {
+        private final Map<MessageId, ReplicaRequestHandler<?>> handlers = new 
HashMap<>();
+
+        private final Map<MessageId, ReadOnlyReplicaRequestHandler<?>> 
roHandlers = new HashMap<>();
+
+        /**
+         * Registers a handler for the given message group and message type.
+         *
+         * @param messageGroup Message group identifier.
+         * @param messageType Message type identifier.
+         * @param handler Handler.
+         * @throws IllegalArgumentException If a handler for the given message 
id is already registered.
+         */
+        public void addHandler(
+                short messageGroup,
+                short messageType,
+                ReplicaRequestHandler<?> handler
+        ) {
+            MessageId id = new MessageId(messageGroup, messageType);
+
+            ReplicaRequestHandler<?> oldHandler = handlers.put(id, handler);
+
+            assert oldHandler == null : "Handler already exists 
[messageGroup=" + messageGroup + ", messageType=" + messageType + "].";
+        }
+
+        /**
+         * Registers a read-only handler for the given message group and 
message type.
+         *
+         * @param messageGroup Message group identifier.
+         * @param messageType Message type identifier.
+         * @param handler Handler.
+         * @throws IllegalArgumentException If a handler for the given message 
id is already registered.
+         */
+        public void addRoHandler(
+                short messageGroup,
+                short messageType,
+                ReadOnlyReplicaRequestHandler<?> handler
+        ) {
+            MessageId id = new MessageId(messageGroup, messageType);
+
+            ReadOnlyReplicaRequestHandler<?> oldHandler = roHandlers.put(id, 
handler);
+
+            assert oldHandler == null : "RO handler already exists 
[messageGroup=" + messageGroup + ", messageType=" + messageType + "].";
+        }
+
+        /**
+         * Builds the registry.
+         *
+         * @return Immutable {@link ReplicaRequestHandlers} instance.
+         */
+        public ReplicaRequestHandlers build() {
+            return new ReplicaRequestHandlers(Map.copyOf(handlers), 
Map.copyOf(roHandlers));
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ScanCloseRequestHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ScanCloseRequestHandler.java
new file mode 100644
index 00000000000..392e9b833ef
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ScanCloseRequestHandler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import static java.lang.String.format;
+import static 
org.apache.ignite.internal.table.distributed.replicator.RemoteResourceIds.cursorId;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.CURSOR_CLOSE_ERR;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import org.apache.ignite.internal.tx.impl.FullyQualifiedResourceId;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Handler for {@link ScanCloseReplicaRequest}.
+ */
+public class ScanCloseRequestHandler implements 
ReplicaRequestHandler<ScanCloseReplicaRequest> {
+    private final RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry;
+
+    private final ZonePartitionId replicationGroupId;
+
+    /**
+     * Constructor.
+     *
+     * @param remotelyTriggeredResourceRegistry Resource registry.
+     * @param replicationGroupId Replication group id.
+     */
+    public ScanCloseRequestHandler(
+            RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
+            ZonePartitionId replicationGroupId
+    ) {
+        this.remotelyTriggeredResourceRegistry = 
remotelyTriggeredResourceRegistry;
+        this.replicationGroupId = replicationGroupId;
+    }
+
+    @Override
+    public CompletableFuture<?> handle(ScanCloseReplicaRequest request, 
ReplicaPrimacy replicaPrimacy) {
+        FullyQualifiedResourceId cursorId = cursorId(request.transactionId(), 
request.scanId());
+
+        try {
+            remotelyTriggeredResourceRegistry.close(cursorId);
+        } catch (IgniteException e) {
+            String message = format("Close cursor exception [replicaGrpId=%s, 
msg=%s]", replicationGroupId, e.getMessage());
+
+            throw new ReplicationException(CURSOR_CLOSE_ERR, message, e);
+        }
+
+        return nullCompletedFuture();
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlersTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlersTest.java
new file mode 100644
index 00000000000..0ad0cf52962
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlersTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link ReplicaRequestHandlers}.
+ */
+class ReplicaRequestHandlersTest {
+    private static final short GROUP_A = 1;
+
+    private static final short GROUP_B = 2;
+
+    private static final short TYPE_1 = 10;
+
+    private static final short TYPE_2 = 20;
+
+    @Test
+    void handlerIsFoundByGroupAndType() {
+        ReplicaRequestHandler<?> handler = (request, replicaPrimacy) -> 
nullCompletedFuture();
+
+        ReplicaRequestHandlers.Builder builder = new 
ReplicaRequestHandlers.Builder();
+        builder.addHandler(GROUP_A, TYPE_1, handler);
+        ReplicaRequestHandlers handlers = builder.build();
+
+        assertThat(handlers.handler(GROUP_A, TYPE_1), is(notNullValue()));
+    }
+
+    @Test
+    void handlerReturnsNullForUnregisteredType() {
+        ReplicaRequestHandlers.Builder builder = new 
ReplicaRequestHandlers.Builder();
+        builder.addHandler(GROUP_A, TYPE_1, (request, replicaPrimacy) -> 
nullCompletedFuture());
+        ReplicaRequestHandlers handlers = builder.build();
+
+        assertThat(handlers.handler(GROUP_A, TYPE_2), is(nullValue()));
+        assertThat(handlers.handler(GROUP_B, TYPE_1), is(nullValue()));
+    }
+
+    @Test
+    void roHandlerIsFoundByGroupAndType() {
+        ReadOnlyReplicaRequestHandler<?> handler = (request, opStartTs) -> 
nullCompletedFuture();
+
+        ReplicaRequestHandlers.Builder builder = new 
ReplicaRequestHandlers.Builder();
+        builder.addRoHandler(GROUP_A, TYPE_1, handler);
+        ReplicaRequestHandlers handlers = builder.build();
+
+        assertThat(handlers.roHandler(GROUP_A, TYPE_1), is(notNullValue()));
+    }
+
+    @Test
+    void roHandlerReturnsNullForUnregisteredType() {
+        ReplicaRequestHandlers.Builder builder = new 
ReplicaRequestHandlers.Builder();
+        builder.addRoHandler(GROUP_A, TYPE_1, (request, opStartTs) -> 
nullCompletedFuture());
+        ReplicaRequestHandlers handlers = builder.build();
+
+        assertThat(handlers.roHandler(GROUP_A, TYPE_2), is(nullValue()));
+        assertThat(handlers.roHandler(GROUP_B, TYPE_1), is(nullValue()));
+    }
+
+    @Test
+    void handlerAndRoHandlerRegistriesAreIndependent() {
+        ReplicaRequestHandlers.Builder builder = new 
ReplicaRequestHandlers.Builder();
+        builder.addHandler(GROUP_A, TYPE_1, (request, replicaPrimacy) -> 
nullCompletedFuture());
+        builder.addRoHandler(GROUP_A, TYPE_2, (request, opStartTs) -> 
nullCompletedFuture());
+        ReplicaRequestHandlers handlers = builder.build();
+
+        assertThat(handlers.handler(GROUP_A, TYPE_1), is(notNullValue()));
+        assertThat(handlers.roHandler(GROUP_A, TYPE_1), is(nullValue()));
+
+        assertThat(handlers.roHandler(GROUP_A, TYPE_2), is(notNullValue()));
+        assertThat(handlers.handler(GROUP_A, TYPE_2), is(nullValue()));
+    }
+
+    @Test
+    void duplicateHandlerRegistrationThrows() {
+        ReplicaRequestHandlers.Builder builder = new 
ReplicaRequestHandlers.Builder();
+        builder.addHandler(GROUP_A, TYPE_1, (request, replicaPrimacy) -> 
nullCompletedFuture());
+
+        assertThrows(AssertionError.class,
+                () -> builder.addHandler(GROUP_A, TYPE_1, (request, 
replicaPrimacy) -> nullCompletedFuture()));
+    }
+
+    @Test
+    void duplicateRoHandlerRegistrationThrows() {
+        ReplicaRequestHandlers.Builder builder = new 
ReplicaRequestHandlers.Builder();
+        builder.addRoHandler(GROUP_A, TYPE_1, (request, opStartTs) -> 
nullCompletedFuture());
+
+        assertThrows(AssertionError.class,
+                () -> builder.addRoHandler(GROUP_A, TYPE_1, (request, 
opStartTs) -> nullCompletedFuture()));
+    }
+}

Reply via email to