This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 67789544e8a IGNITE-28293 Introduce registry for replica request
handlers (#7822)
67789544e8a is described below
commit 67789544e8aa7103aed264ded11a3985da6e5996
Author: Ivan Zlenko <[email protected]>
AuthorDate: Mon Mar 23 12:46:12 2026 +0500
IGNITE-28293 Introduce registry for replica request handlers (#7822)
---
...ilablePartitionsRecoveryByFilterUpdateTest.java | 2 +
.../replicator/PartitionReplicaListener.java | 63 +++++----
.../handlers/BuildIndexReplicaRequestHandler.java | 11 +-
.../handlers/ReadOnlyReplicaRequestHandler.java | 39 +++++
.../replicator/handlers/ReplicaRequestHandler.java | 39 +++++
.../handlers/ReplicaRequestHandlers.java | 157 +++++++++++++++++++++
.../handlers/ScanCloseRequestHandler.java | 70 +++++++++
.../handlers/ReplicaRequestHandlersTest.java | 114 +++++++++++++++
8 files changed, 460 insertions(+), 35 deletions(-)
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
index 438e9dbaacc..773f5610b67 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
@@ -254,6 +254,7 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
* @throws Exception If failed.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316")
void testNodesWaitForLastNodeFromChainToComeBackOnlineAfterMajorityStops()
throws Exception {
for (int i = 1; i < 8; i++) {
startNode(i, CUSTOM_NODES_CONFIG);
@@ -315,6 +316,7 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
* @throws Exception If failed.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316")
void
testNodesWaitForNodesFromGracefulChainToComeBackOnlineAfterMajorityStops()
throws Exception {
for (int i = 1; i < 8; i++) {
startNode(i, CUSTOM_NODES_CONFIG);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index c817d48540d..9bf053157e9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -114,6 +114,7 @@ import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApp
import
org.apache.ignite.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
import
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
import
org.apache.ignite.internal.partition.replicator.exception.OperationLockException;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
import
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
@@ -123,7 +124,6 @@ import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCom
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2Builder;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
-import
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
@@ -139,7 +139,6 @@ import
org.apache.ignite.internal.partition.replicator.network.replication.ReadW
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSwapRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
-import
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import
org.apache.ignite.internal.partition.replicator.schemacompat.IncompatibleSchemaVersionException;
import
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
@@ -184,6 +183,10 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import org.apache.ignite.internal.table.distributed.TableUtils;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
+import
org.apache.ignite.internal.table.distributed.replicator.handlers.ReadOnlyReplicaRequestHandler;
+import
org.apache.ignite.internal.table.distributed.replicator.handlers.ReplicaRequestHandler;
+import
org.apache.ignite.internal.table.distributed.replicator.handlers.ReplicaRequestHandlers;
+import
org.apache.ignite.internal.table.distributed.replicator.handlers.ScanCloseRequestHandler;
import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
import org.apache.ignite.internal.tx.DelayedAckException;
import org.apache.ignite.internal.tx.Lock;
@@ -335,8 +338,8 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
private final ReliableCatalogVersions reliableCatalogVersions;
private final ReplicationRaftCommandApplicator raftCommandApplicator;
- // Replica request handlers.
- private final BuildIndexReplicaRequestHandler
buildIndexReplicaRequestHandler;
+ /** Registry of replica request handlers. */
+ private final ReplicaRequestHandlers requestHandlers;
/**
* The constructor.
@@ -428,7 +431,19 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
reliableCatalogVersions = new
ReliableCatalogVersions(schemaSyncService, catalogService);
raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
- buildIndexReplicaRequestHandler = new
BuildIndexReplicaRequestHandler(indexMetaStorage, raftCommandApplicator);
+ ReplicaRequestHandlers.Builder handlersBuilder = new
ReplicaRequestHandlers.Builder();
+
+ handlersBuilder.addHandler(
+ PartitionReplicationMessageGroup.GROUP_TYPE,
+ PartitionReplicationMessageGroup.SCAN_CLOSE_REPLICA_REQUEST,
+ new ScanCloseRequestHandler(remotelyTriggeredResourceRegistry,
replicationGroupId));
+
+ handlersBuilder.addHandler(
+ PartitionReplicationMessageGroup.GROUP_TYPE,
+ PartitionReplicationMessageGroup.BUILD_INDEX_REPLICA_REQUEST,
+ new BuildIndexReplicaRequestHandler(indexMetaStorage,
raftCommandApplicator));
+
+ requestHandlers = handlersBuilder.build();
}
@Override
@@ -511,6 +526,21 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
ReplicaPrimacy replicaPrimacy,
@Nullable HybridTimestamp opStartTsIfDirectRo
) {
+ ReplicaRequestHandler<ReplicaRequest> handler =
requestHandlers.handler(request.groupType(), request.messageType());
+
+ if (handler != null) {
+ return handler.handle(request, replicaPrimacy);
+ }
+
+ ReadOnlyReplicaRequestHandler<ReplicaRequest> roHandler =
+ requestHandlers.roHandler(request.groupType(),
request.messageType());
+
+ if (roHandler != null) {
+ assert opStartTsIfDirectRo != null;
+
+ return roHandler.handle(request, opStartTsIfDirectRo);
+ }
+
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
@@ -590,10 +620,6 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
releaseTxLocks(req.transactionId());
}
});
- } else if (request instanceof ScanCloseReplicaRequest) {
- processScanCloseAction((ScanCloseReplicaRequest) request);
-
- return nullCompletedFuture();
} else if (request instanceof TableWriteIntentSwitchReplicaRequest) {
return
processTableWriteIntentSwitchAction((TableWriteIntentSwitchReplicaRequest)
request);
} else if (request instanceof ReadOnlySingleRowPkReplicaRequest) {
@@ -602,8 +628,6 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return
processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest) request,
replicaPrimacy.isPrimary());
} else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest)
{
return
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
request, replicaPrimacy.isPrimary());
- } else if (request instanceof BuildIndexReplicaRequest) {
- return
buildIndexReplicaRequestHandler.handle((BuildIndexReplicaRequest) request);
} else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
return
processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest)
request, opStartTsIfDirectRo);
} else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
@@ -920,23 +944,6 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
});
}
- /**
- * Processes scan close request.
- *
- * @param request Scan close request operation.
- */
- private void processScanCloseAction(ScanCloseReplicaRequest request) {
- UUID txId = request.transactionId();
-
- FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());
-
- try {
- remotelyTriggeredResourceRegistry.close(cursorId);
- } catch (IgniteException e) {
- throw wrapCursorCloseException(e);
- }
- }
-
/**
* Closes a cursor if the batch is not fully retrieved.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
index 7d37a4922f1..3ab29fcea2e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.partition.replicator.index.MetaIndexSta
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import org.apache.ignite.internal.partition.replicator.index.IndexMeta;
import
org.apache.ignite.internal.partition.replicator.index.MetaIndexStatusChange;
@@ -32,7 +33,7 @@ import
org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
/**
* Handler for {@link BuildIndexReplicaRequest}.
*/
-public class BuildIndexReplicaRequestHandler {
+public class BuildIndexReplicaRequestHandler implements
ReplicaRequestHandler<BuildIndexReplicaRequest> {
/** Factory to create RAFT command messages. */
private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
new PartitionReplicationMessagesFactory();
@@ -53,12 +54,8 @@ public class BuildIndexReplicaRequestHandler {
this.commandApplicator = commandApplicator;
}
- /**
- * Handles {@link BuildIndexReplicaRequest}.
- *
- * @param request Request to handle.
- */
- public CompletableFuture<?> handle(BuildIndexReplicaRequest request) {
+ @Override
+ public CompletableFuture<?> handle(BuildIndexReplicaRequest request,
ReplicaPrimacy replicaPrimacy) {
IndexMeta indexMeta = indexMetaStorage.indexMeta(request.indexId());
if (indexMeta == null || indexMeta.isDropped()) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReadOnlyReplicaRequestHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReadOnlyReplicaRequestHandler.java
new file mode 100644
index 00000000000..d6cb42b6991
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReadOnlyReplicaRequestHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+
+/**
+ * Handler for read-only replica requests that require an operation start
timestamp.
+ *
+ * @param <T> Type of the request this handler processes.
+ */
+@FunctionalInterface
+public interface ReadOnlyReplicaRequestHandler<T extends ReplicaRequest> {
+ /**
+ * Handles the given read-only request.
+ *
+ * @param request Request to handle.
+ * @param opStartTs Operation start timestamp.
+ * @return Future with the result.
+ */
+ CompletableFuture<?> handle(T request, HybridTimestamp opStartTs);
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandler.java
new file mode 100644
index 00000000000..53ce8104b39
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+
+/**
+ * Handler for a specific type of {@link ReplicaRequest}.
+ *
+ * @param <T> Type of the request this handler processes.
+ */
+@FunctionalInterface
+public interface ReplicaRequestHandler<T extends ReplicaRequest> {
+ /**
+ * Handles the given request.
+ *
+ * @param request Request to handle.
+ * @param replicaPrimacy Replica primacy information.
+ * @return Future with the result.
+ */
+ CompletableFuture<?> handle(T request, ReplicaPrimacy replicaPrimacy);
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlers.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlers.java
new file mode 100644
index 00000000000..6017848b745
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlers.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Registry of replica request handlers keyed by message group and message
type.
+ *
+ * <p>Contains two separate registries: one for {@link ReplicaRequestHandler}
and one for {@link ReadOnlyReplicaRequestHandler}.
+ */
+public class ReplicaRequestHandlers {
+ private final Map<MessageId, ReplicaRequestHandler<?>> handlers;
+
+ private final Map<MessageId, ReadOnlyReplicaRequestHandler<?>> roHandlers;
+
+ private ReplicaRequestHandlers(
+ Map<MessageId, ReplicaRequestHandler<?>> handlers,
+ Map<MessageId, ReadOnlyReplicaRequestHandler<?>> roHandlers
+ ) {
+ this.handlers = handlers;
+ this.roHandlers = roHandlers;
+ }
+
+ /**
+ * Returns a handler for the given message group and type, or {@code null}
if no handler is registered.
+ *
+ * @param messageGroup Message group identifier.
+ * @param messageType Message type identifier.
+ * @return Handler, or {@code null} if not found.
+ */
+ public @Nullable ReplicaRequestHandler<ReplicaRequest> handler(short
messageGroup, short messageType) {
+ return (ReplicaRequestHandler<ReplicaRequest>) handlers.get(new
MessageId(messageGroup, messageType));
+ }
+
+ /**
+ * Returns a read-only handler for the given message group and type, or
{@code null} if no handler is registered.
+ *
+ * @param messageGroup Message group identifier.
+ * @param messageType Message type identifier.
+ * @return Handler, or {@code null} if not found.
+ */
+ public @Nullable ReadOnlyReplicaRequestHandler<ReplicaRequest> roHandler(
+ short messageGroup,
+ short messageType
+ ) {
+ return (ReadOnlyReplicaRequestHandler<ReplicaRequest>)
roHandlers.get(new MessageId(messageGroup, messageType));
+ }
+
+ private static final class MessageId {
+ final short messageGroup;
+ final short messageType;
+
+ MessageId(short messageGroup, short messageType) {
+ this.messageGroup = messageGroup;
+ this.messageType = messageType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MessageId messageId = (MessageId) o;
+ return messageGroup == messageId.messageGroup && messageType ==
messageId.messageType;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 1;
+
+ result = 31 * result + messageGroup;
+ result = 31 * result + messageType;
+
+ return result;
+ }
+ }
+
+ /**
+ * Builder for {@link ReplicaRequestHandlers}.
+ */
+ public static class Builder {
+ private final Map<MessageId, ReplicaRequestHandler<?>> handlers = new
HashMap<>();
+
+ private final Map<MessageId, ReadOnlyReplicaRequestHandler<?>>
roHandlers = new HashMap<>();
+
+ /**
+ * Registers a handler for the given message group and message type.
+ *
+ * @param messageGroup Message group identifier.
+ * @param messageType Message type identifier.
+ * @param handler Handler.
+ * @throws IllegalArgumentException If a handler for the given message
id is already registered.
+ */
+ public void addHandler(
+ short messageGroup,
+ short messageType,
+ ReplicaRequestHandler<?> handler
+ ) {
+ MessageId id = new MessageId(messageGroup, messageType);
+
+ ReplicaRequestHandler<?> oldHandler = handlers.put(id, handler);
+
+ assert oldHandler == null : "Handler already exists
[messageGroup=" + messageGroup + ", messageType=" + messageType + "].";
+ }
+
+ /**
+ * Registers a read-only handler for the given message group and
message type.
+ *
+ * @param messageGroup Message group identifier.
+ * @param messageType Message type identifier.
+ * @param handler Handler.
+ * @throws IllegalArgumentException If a handler for the given message
id is already registered.
+ */
+ public void addRoHandler(
+ short messageGroup,
+ short messageType,
+ ReadOnlyReplicaRequestHandler<?> handler
+ ) {
+ MessageId id = new MessageId(messageGroup, messageType);
+
+ ReadOnlyReplicaRequestHandler<?> oldHandler = roHandlers.put(id,
handler);
+
+ assert oldHandler == null : "RO handler already exists
[messageGroup=" + messageGroup + ", messageType=" + messageType + "].";
+ }
+
+ /**
+ * Builds the registry.
+ *
+ * @return Immutable {@link ReplicaRequestHandlers} instance.
+ */
+ public ReplicaRequestHandlers build() {
+ return new ReplicaRequestHandlers(Map.copyOf(handlers),
Map.copyOf(roHandlers));
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ScanCloseRequestHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ScanCloseRequestHandler.java
new file mode 100644
index 00000000000..392e9b833ef
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ScanCloseRequestHandler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import static java.lang.String.format;
+import static
org.apache.ignite.internal.table.distributed.replicator.RemoteResourceIds.cursorId;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.CURSOR_CLOSE_ERR;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
+import
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import org.apache.ignite.internal.tx.impl.FullyQualifiedResourceId;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Handler for {@link ScanCloseReplicaRequest}.
+ */
+public class ScanCloseRequestHandler implements
ReplicaRequestHandler<ScanCloseReplicaRequest> {
+ private final RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry;
+
+ private final ZonePartitionId replicationGroupId;
+
+ /**
+ * Constructor.
+ *
+ * @param remotelyTriggeredResourceRegistry Resource registry.
+ * @param replicationGroupId Replication group id.
+ */
+ public ScanCloseRequestHandler(
+ RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry,
+ ZonePartitionId replicationGroupId
+ ) {
+ this.remotelyTriggeredResourceRegistry =
remotelyTriggeredResourceRegistry;
+ this.replicationGroupId = replicationGroupId;
+ }
+
+ @Override
+ public CompletableFuture<?> handle(ScanCloseReplicaRequest request,
ReplicaPrimacy replicaPrimacy) {
+ FullyQualifiedResourceId cursorId = cursorId(request.transactionId(),
request.scanId());
+
+ try {
+ remotelyTriggeredResourceRegistry.close(cursorId);
+ } catch (IgniteException e) {
+ String message = format("Close cursor exception [replicaGrpId=%s,
msg=%s]", replicationGroupId, e.getMessage());
+
+ throw new ReplicationException(CURSOR_CLOSE_ERR, message, e);
+ }
+
+ return nullCompletedFuture();
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlersTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlersTest.java
new file mode 100644
index 00000000000..0ad0cf52962
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/handlers/ReplicaRequestHandlersTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.handlers;
+
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link ReplicaRequestHandlers}.
+ */
+class ReplicaRequestHandlersTest {
+ private static final short GROUP_A = 1;
+
+ private static final short GROUP_B = 2;
+
+ private static final short TYPE_1 = 10;
+
+ private static final short TYPE_2 = 20;
+
+ @Test
+ void handlerIsFoundByGroupAndType() {
+ ReplicaRequestHandler<?> handler = (request, replicaPrimacy) ->
nullCompletedFuture();
+
+ ReplicaRequestHandlers.Builder builder = new
ReplicaRequestHandlers.Builder();
+ builder.addHandler(GROUP_A, TYPE_1, handler);
+ ReplicaRequestHandlers handlers = builder.build();
+
+ assertThat(handlers.handler(GROUP_A, TYPE_1), is(notNullValue()));
+ }
+
+ @Test
+ void handlerReturnsNullForUnregisteredType() {
+ ReplicaRequestHandlers.Builder builder = new
ReplicaRequestHandlers.Builder();
+ builder.addHandler(GROUP_A, TYPE_1, (request, replicaPrimacy) ->
nullCompletedFuture());
+ ReplicaRequestHandlers handlers = builder.build();
+
+ assertThat(handlers.handler(GROUP_A, TYPE_2), is(nullValue()));
+ assertThat(handlers.handler(GROUP_B, TYPE_1), is(nullValue()));
+ }
+
+ @Test
+ void roHandlerIsFoundByGroupAndType() {
+ ReadOnlyReplicaRequestHandler<?> handler = (request, opStartTs) ->
nullCompletedFuture();
+
+ ReplicaRequestHandlers.Builder builder = new
ReplicaRequestHandlers.Builder();
+ builder.addRoHandler(GROUP_A, TYPE_1, handler);
+ ReplicaRequestHandlers handlers = builder.build();
+
+ assertThat(handlers.roHandler(GROUP_A, TYPE_1), is(notNullValue()));
+ }
+
+ @Test
+ void roHandlerReturnsNullForUnregisteredType() {
+ ReplicaRequestHandlers.Builder builder = new
ReplicaRequestHandlers.Builder();
+ builder.addRoHandler(GROUP_A, TYPE_1, (request, opStartTs) ->
nullCompletedFuture());
+ ReplicaRequestHandlers handlers = builder.build();
+
+ assertThat(handlers.roHandler(GROUP_A, TYPE_2), is(nullValue()));
+ assertThat(handlers.roHandler(GROUP_B, TYPE_1), is(nullValue()));
+ }
+
+ @Test
+ void handlerAndRoHandlerRegistriesAreIndependent() {
+ ReplicaRequestHandlers.Builder builder = new
ReplicaRequestHandlers.Builder();
+ builder.addHandler(GROUP_A, TYPE_1, (request, replicaPrimacy) ->
nullCompletedFuture());
+ builder.addRoHandler(GROUP_A, TYPE_2, (request, opStartTs) ->
nullCompletedFuture());
+ ReplicaRequestHandlers handlers = builder.build();
+
+ assertThat(handlers.handler(GROUP_A, TYPE_1), is(notNullValue()));
+ assertThat(handlers.roHandler(GROUP_A, TYPE_1), is(nullValue()));
+
+ assertThat(handlers.roHandler(GROUP_A, TYPE_2), is(notNullValue()));
+ assertThat(handlers.handler(GROUP_A, TYPE_2), is(nullValue()));
+ }
+
+ @Test
+ void duplicateHandlerRegistrationThrows() {
+ ReplicaRequestHandlers.Builder builder = new
ReplicaRequestHandlers.Builder();
+ builder.addHandler(GROUP_A, TYPE_1, (request, replicaPrimacy) ->
nullCompletedFuture());
+
+ assertThrows(AssertionError.class,
+ () -> builder.addHandler(GROUP_A, TYPE_1, (request,
replicaPrimacy) -> nullCompletedFuture()));
+ }
+
+ @Test
+ void duplicateRoHandlerRegistrationThrows() {
+ ReplicaRequestHandlers.Builder builder = new
ReplicaRequestHandlers.Builder();
+ builder.addRoHandler(GROUP_A, TYPE_1, (request, opStartTs) ->
nullCompletedFuture());
+
+ assertThrows(AssertionError.class,
+ () -> builder.addRoHandler(GROUP_A, TYPE_1, (request,
opStartTs) -> nullCompletedFuture()));
+ }
+}