This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4e2e64b082d IGNITE-28539 Rename PartitionReplicaListener and related
classes (#7986)
4e2e64b082d is described below
commit 4e2e64b082d9c43465bdcba97e5b85cdd78df4df
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Apr 15 21:11:19 2026 +0400
IGNITE-28539 Rename PartitionReplicaListener and related classes (#7986)
---
.../replicator/ItReplicaLifecycleTest.java | 4 +--
.../ItZonePartitionRaftListenerRecoveryTest.java | 6 ++--
.../PartitionReplicaLifecycleManager.java | 9 ++++--
...or.java => TablePartitionReplicaProcessor.java} | 2 +-
.../replicator/ZonePartitionReplicaListener.java | 6 ++--
.../handlers/WriteIntentSwitchRequestHandler.java | 10 +++----
.../replicator/raft/OnSnapshotSaveHandler.java | 7 +++--
...essor.java => TablePartitionRaftProcessor.java} | 2 +-
.../replicator/raft/ZonePartitionRaftListener.java | 20 +++++++-------
.../handlers/WriteIntentSwitchCommandHandler.java | 10 +++----
.../replicator/raft/handlers/package-info.java | 2 +-
.../raft/ZonePartitionRaftListenerTest.java | 32 +++++++++++-----------
...xDistributedTestSingleNodeNoCleanupMessage.java | 6 ++--
.../TablePartitionResourcesFactory.java | 20 +++++++-------
.../table/distributed/TableZoneCoordinator.java | 6 ++--
...ava => DefaultTablePartitionRaftProcessor.java} | 8 +++---
.../distributed/raft/handlers/package-info.java | 2 +-
... => DefaultTablePartitionReplicaProcessor.java} | 14 +++++-----
.../replicator/handlers/package-info.java | 2 +-
...=> DefaultTablePartitionRaftProcessorTest.java} | 6 ++--
...PartitionReplicaProcessorIndexLockingTest.java} | 16 +++++------
...ionReplicaProcessorSortedIndexLockingTest.java} | 18 ++++++------
...DefaultTablePartitionReplicaProcessorTest.java} | 32 +++++++++++-----------
.../ZonePartitionReplicaListenerTest.java | 8 +++---
.../storage/InternalTableEstimatedSizeTest.java | 12 ++++----
.../apache/ignite/distributed/ItTxTestCluster.java | 10 +++----
.../ignite/internal/table/TxAbstractTest.java | 20 ++++++++++----
.../internal/table/TxInfrastructureTest.java | 4 +--
.../table/impl/DummyInternalTableImpl.java | 8 +++---
.../internal/tx/test/ItTransactionTestUtils.java | 6 ++--
30 files changed, 162 insertions(+), 146 deletions(-)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 2b50d109c96..b09b43fb178 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -66,7 +66,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.partition.replicator.fixtures.Node;
-import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
+import
org.apache.ignite.internal.partition.replicator.raft.TablePartitionRaftProcessor;
import
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
@@ -795,7 +795,7 @@ public class ItReplicaLifecycleTest extends
ItAbstractColocationTest {
var fsm = (JraftServerImpl.DelegatingStateMachine)
grp.getRaftNode().getOptions().getFsm();
- RaftTableProcessor tableProcessor = ((ZonePartitionRaftListener)
fsm.getListener()).tableProcessor(tableId);
+ TablePartitionRaftProcessor tableProcessor =
((ZonePartitionRaftListener) fsm.getListener()).tableProcessor(tableId);
return tableProcessor != null;
}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index b6abedd366e..6319c77a3af 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -111,8 +111,8 @@ import
org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import
org.apache.ignite.internal.table.distributed.raft.DefaultTablePartitionRaftProcessor;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -387,7 +387,7 @@ class ItZonePartitionRaftListenerRecoveryTest extends
IgniteAbstractTest {
raftManager.stopRaftNodes(PARTITION_ID);
}
- private RaftTableProcessor createTableProcessor(int tableId) {
+ private TablePartitionRaftProcessor createTableProcessor(int tableId) {
var storage = new SnapshotAwarePartitionDataStorage(
tableId,
mockStorage(tableId).storage,
@@ -405,7 +405,7 @@ class ItZonePartitionRaftListenerRecoveryTest extends
IgniteAbstractTest {
return clock.update(requestTime);
});
- return new TablePartitionProcessor(
+ return new DefaultTablePartitionRaftProcessor(
txManager,
storage,
storageUpdateHandler,
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index eb82afa5e2c..222cfe179b2 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -142,7 +142,7 @@ import
org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
import org.apache.ignite.internal.partition.replicator.index.IndexMetasAccess;
-import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
+import
org.apache.ignite.internal.partition.replicator.raft.TablePartitionRaftProcessor;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -1948,7 +1948,7 @@ public class PartitionReplicaLifecycleManager extends
ZonePartitionId zonePartitionId,
int tableId,
TablePartitionReplicaProcessorFactory
tablePartitionReplicaProcessorFactory,
- RaftTableProcessor raftTableProcessor,
+ TablePartitionRaftProcessor raftTableProcessor,
PartitionMvStorageAccess partitionMvStorageAccess,
boolean onNodeRecovery
) {
@@ -2346,6 +2346,9 @@ public class PartitionReplicaLifecycleManager extends
* @param transactionStateResolver Transaction state resolver.
* @return Table partition replica processor.
*/
- ReplicaTableProcessor createProcessor(RaftCommandRunner
raftCommandRunner, TransactionStateResolver transactionStateResolver);
+ TablePartitionReplicaProcessor createProcessor(
+ RaftCommandRunner raftCommandRunner,
+ TransactionStateResolver transactionStateResolver
+ );
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TablePartitionReplicaProcessor.java
similarity index 96%
rename from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
rename to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TablePartitionReplicaProcessor.java
index 3d9f8044010..8223f0b7187 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TablePartitionReplicaProcessor.java
@@ -25,7 +25,7 @@ import
org.apache.ignite.internal.replicator.message.ReplicaRequest;
/**
* Processor of replica requests targeted at a particular table.
*/
-public interface ReplicaTableProcessor {
+public interface TablePartitionReplicaProcessor {
/**
* Processes replica request.
*
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index a7928cc0623..db10db4dd0a 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -75,7 +75,7 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
private static final IgniteLogger LOG =
Loggers.forClass(ZonePartitionReplicaListener.class);
// tableId -> tableProcessor.
- private final Map<Integer, ReplicaTableProcessor> replicaProcessors = new
ConcurrentHashMap<>();
+ private final Map<Integer, TablePartitionReplicaProcessor>
replicaProcessors = new ConcurrentHashMap<>();
/** Raft client. */
private final RaftCommandRunner raftClient;
@@ -260,7 +260,7 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
.thenCompose(ignored -> {
int tableId = ((TableAware) request).tableId();
- ReplicaTableProcessor replicaProcessor =
replicaProcessors.get(tableId);
+ TablePartitionReplicaProcessor replicaProcessor =
replicaProcessors.get(tableId);
if (replicaProcessor == null) {
// Most of the times this condition should be false.
This block handles a case when a request got stuck
@@ -334,7 +334,7 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
* @return Table replicas listeners.
*/
@VisibleForTesting
- public Map<Integer, ReplicaTableProcessor> tableReplicaProcessors() {
+ public Map<Integer, TablePartitionReplicaProcessor>
tableReplicaProcessors() {
return replicaProcessors;
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
index 7f766d84e56..663453e59b0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
@@ -40,10 +40,10 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.partition.replicator.FuturesCleanupResult;
import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
-import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker;
import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import
org.apache.ignite.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
+import
org.apache.ignite.internal.partition.replicator.TablePartitionReplicaProcessor;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
@@ -74,7 +74,7 @@ public class WriteIntentSwitchRequestHandler {
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
- private final IntFunction<ReplicaTableProcessor> replicaListenerByTableId;
+ private final IntFunction<TablePartitionReplicaProcessor>
replicaListenerByTableId;
private final ClockService clockService;
@@ -89,7 +89,7 @@ public class WriteIntentSwitchRequestHandler {
/** Constructor. */
public WriteIntentSwitchRequestHandler(
- IntFunction<ReplicaTableProcessor> replicaListenerByTableId,
+ IntFunction<TablePartitionReplicaProcessor>
replicaListenerByTableId,
ClockService clockService,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
@@ -174,7 +174,7 @@ public class WriteIntentSwitchRequestHandler {
// Using empty primacy because the request is not a
PrimaryReplicaRequest.
return
tableAwareReplicaRequestPreProcessor.preProcessTableAwareRequest(tableSpecificRequest)
.thenCompose(ignored -> {
- ReplicaTableProcessor replicaProcessor =
replicaTableProcessor(tableId);
+ TablePartitionReplicaProcessor replicaProcessor =
replicaTableProcessor(tableId);
if (replicaProcessor == null) {
// Most of the times this condition should be false.
This block handles a case when a request got stuck
@@ -215,7 +215,7 @@ public class WriteIntentSwitchRequestHandler {
return new WriteIntentSwitchReplicatedInfo(request.txId(),
replicationGroupId);
}
- private ReplicaTableProcessor replicaTableProcessor(int tableId) {
+ private TablePartitionReplicaProcessor replicaTableProcessor(int tableId) {
return replicaListenerByTableId.apply(tableId);
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
index 4cb9caf74fa..adb4a2081c0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
@@ -42,7 +42,10 @@ public class OnSnapshotSaveHandler {
/**
* Called when {@link RaftGroupListener#onSnapshotSave} is triggered.
*/
- public CompletableFuture<Void> onSnapshotSave(PartitionSnapshotInfo
snapshotInfo, Collection<RaftTableProcessor> tableProcessors) {
+ public CompletableFuture<Void> onSnapshotSave(
+ PartitionSnapshotInfo snapshotInfo,
+ Collection<TablePartitionRaftProcessor> tableProcessors
+ ) {
// The max index here is required for local recovery and a possible
scenario
// of false node failure when we actually have all required data. This
might happen because we use the minimal index
// among storages on a node restart.
@@ -63,7 +66,7 @@ public class OnSnapshotSaveHandler {
txStatePartitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
CompletableFuture<?>[] tableStorageFlushFutures =
tableProcessors.stream()
- .map(RaftTableProcessor::flushStorage)
+ .map(TablePartitionRaftProcessor::flushStorage)
.toArray(CompletableFuture<?>[]::new);
// Flush the TX state storage last to guarantee that all data is
flushed before the snapshot is saved.
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/TablePartitionRaftProcessor.java
similarity index 98%
rename from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
rename to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/TablePartitionRaftProcessor.java
index f3ef6b25909..ebcd56d1cb4 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/TablePartitionRaftProcessor.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Processor of Raft commands targeted at a particular table.
*/
-public interface RaftTableProcessor {
+public interface TablePartitionRaftProcessor {
/**
* Processes a Raft command.
*
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index d2c480b357f..c9036a5cf9f 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -81,7 +81,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
*
* <p>Concurrent access is guarded by {@link #tableProcessorsStateLock}.
*/
- private final Int2ObjectMap<RaftTableProcessor> tableProcessors = new
Int2ObjectOpenHashMap<>();
+ private final Int2ObjectMap<TablePartitionRaftProcessor> tableProcessors =
new Int2ObjectOpenHashMap<>();
private final TxStatePartitionStorage txStateStorage;
@@ -313,7 +313,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
boolean wasApplied = false;
- for (RaftTableProcessor processor : tableProcessors.values()) {
+ for (TablePartitionRaftProcessor processor : tableProcessors.values())
{
CommandResult r = processor.processCommand(command, commandIndex,
commandTerm, safeTimestamp);
wasApplied = wasApplied || r.wasApplied();
@@ -339,7 +339,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
long commandTerm,
@Nullable HybridTimestamp safeTimestamp
) {
- RaftTableProcessor tableProcessor = tableProcessors.get(tableId);
+ TablePartitionRaftProcessor tableProcessor =
tableProcessors.get(tableId);
if (tableProcessor == null) {
// Most of the times this condition should be false. This logging
message is added in case a Raft command got stuck somewhere
@@ -444,7 +444,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
cleanupSnapshots();
synchronized (tableProcessorsStateLock) {
- tableProcessors.values().forEach(RaftTableProcessor::onShutdown);
+
tableProcessors.values().forEach(TablePartitionRaftProcessor::onShutdown);
}
}
@@ -454,7 +454,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
long result = max(0, txStateStorage.lastAppliedIndex());
synchronized (tableProcessorsStateLock) {
- for (RaftTableProcessor processor : tableProcessors.values()) {
+ for (TablePartitionRaftProcessor processor :
tableProcessors.values()) {
result = max(result, processor.lastAppliedIndex());
}
}
@@ -471,7 +471,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
* version. Until the Catalog version is updated, commands targeting the
table being added will be rejected by an interceptor that
* requires the Catalog version to be equal to a particular value.
*/
- public void addTableProcessor(int tableId, RaftTableProcessor processor) {
+ public void addTableProcessor(int tableId, TablePartitionRaftProcessor
processor) {
synchronized (tableProcessorsStateLock) {
RaftGroupConfiguration configuration =
raftGroupConfigurationConverter.fromBytes(txStateStorage.committedGroupConfiguration());
@@ -479,7 +479,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
processor.initialize(configuration, leaseInfo, lastAppliedIndex,
lastAppliedTerm);
- RaftTableProcessor prev = tableProcessors.put(tableId, processor);
+ TablePartitionRaftProcessor prev = tableProcessors.put(tableId,
processor);
assert prev == null : "Listener for table " + tableId + " already
exists";
}
@@ -488,7 +488,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
/**
* Adds a given Table Partition-level Raft processor to the set of managed
processors during node recovery on startup.
*/
- public void addTableProcessorOnRecovery(int tableId, RaftTableProcessor
processor) {
+ public void addTableProcessorOnRecovery(int tableId,
TablePartitionRaftProcessor processor) {
synchronized (tableProcessorsStateLock) {
PartitionSnapshotInfo snapshotInfo = snapshotInfo();
@@ -513,7 +513,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
);
}
- RaftTableProcessor prev = tableProcessors.put(tableId, processor);
+ TablePartitionRaftProcessor prev = tableProcessors.put(tableId,
processor);
assert prev == null : "Listener for table " + tableId + " already
exists";
}
@@ -531,7 +531,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
/** Returns the table processor associated with the given table ID. */
@TestOnly
- public @Nullable RaftTableProcessor tableProcessor(int tableId) {
+ public @Nullable TablePartitionRaftProcessor tableProcessor(int tableId) {
synchronized (tableProcessorsStateLock) {
return tableProcessors.get(tableId);
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
index 0d40a9bc560..fa841d7e4f9 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
@@ -24,8 +24,8 @@ import org.apache.ignite.internal.logger.Loggers;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommandV2;
import org.apache.ignite.internal.partition.replicator.raft.CommandResult;
-import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker;
+import
org.apache.ignite.internal.partition.replicator.raft.TablePartitionRaftProcessor;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.jetbrains.annotations.Nullable;
@@ -36,7 +36,7 @@ import org.jetbrains.annotations.Nullable;
public class WriteIntentSwitchCommandHandler extends
AbstractCommandHandler<WriteIntentSwitchCommand> {
private static final IgniteLogger LOG =
Loggers.forClass(WriteIntentSwitchCommandHandler.class);
- private final IntFunction<RaftTableProcessor> tableProcessorByTableId;
+ private final IntFunction<TablePartitionRaftProcessor>
tableProcessorByTableId;
private final RaftTxFinishMarker txFinishMarker;
@@ -44,7 +44,7 @@ public class WriteIntentSwitchCommandHandler extends
AbstractCommandHandler<Writ
/** Constructor. */
public WriteIntentSwitchCommandHandler(
- IntFunction<RaftTableProcessor> tableProcessorByTableId,
+ IntFunction<TablePartitionRaftProcessor> tableProcessorByTableId,
TxManager txManager,
TxStatePartitionStorage txStatePartitionStorage
) {
@@ -68,7 +68,7 @@ public class WriteIntentSwitchCommandHandler extends
AbstractCommandHandler<Writ
boolean applied = false;
boolean handledByAnyTable = false;
for (int tableId : ((WriteIntentSwitchCommandV2)
switchCommand).tableIds()) {
- RaftTableProcessor tableProcessor = raftTableProcessor(tableId);
+ TablePartitionRaftProcessor tableProcessor =
raftTableProcessor(tableId);
if (tableProcessor == null) {
// This can only happen if the table in question has already
been dropped and destroyed. In such case, we simply
@@ -97,7 +97,7 @@ public class WriteIntentSwitchCommandHandler extends
AbstractCommandHandler<Writ
return new CommandResult(null, applied);
}
- private @Nullable RaftTableProcessor raftTableProcessor(int tableId) {
+ private @Nullable TablePartitionRaftProcessor raftTableProcessor(int
tableId) {
return tableProcessorByTableId.apply(tableId);
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/package-info.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/package-info.java
index d0c81eaa4ff..260b635ccef 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/package-info.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/package-info.java
@@ -18,7 +18,7 @@
/**
* This package contains RAFT command handlers that is used by
* {@link
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener}
and
- * {@link
org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor}.
+ * {@link
org.apache.ignite.internal.partition.replicator.raft.TablePartitionRaftProcessor}.
*/
package org.apache.ignite.internal.partition.replicator.raft.handlers;
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index ebd0631df6b..e671f76b36d 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -96,8 +96,8 @@ import
org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import
org.apache.ignite.internal.table.distributed.raft.DefaultTablePartitionRaftProcessor;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
@@ -227,7 +227,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
void savesSnapshotInfoToTxStorageOnSnapshotSave(
@Mock CommandClosure<WriteCommand> writeCommandClosure,
@Mock PrimaryReplicaChangeCommand command,
- @Mock RaftTableProcessor tableProcessor
+ @Mock TablePartitionRaftProcessor tableProcessor
) {
when(writeCommandClosure.command()).thenReturn(command);
when(writeCommandClosure.index()).thenReturn(25L);
@@ -309,9 +309,9 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
listener.onConfigurationCommitted(raftGroupConfiguration, 2L, 3L);
- TablePartitionProcessor tablePartitionProcessor =
partitionListener(TABLE_ID);
+ DefaultTablePartitionRaftProcessor tablePartitionRaftProcessor =
partitionListener(TABLE_ID);
- listener.addTableProcessor(TABLE_ID, tablePartitionProcessor);
+ listener.addTableProcessor(TABLE_ID, tablePartitionRaftProcessor);
verify(mvPartitionStorage).lastApplied(2L, 3L);
verify(mvPartitionStorage).committedGroupConfiguration(any());
@@ -325,7 +325,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
var raftGroupConfiguration = new RaftGroupConfiguration(index, term,
111L, 110L, List.of("foo"), List.of("bar"), null, null);
- var tableProcessor = new TestRaftTableProcessor();
+ var tableProcessor = new TestTablePartitionRaftProcessor();
CompletableFuture<Void> f1 = runAsync(() ->
listener.onConfigurationCommitted(raftGroupConfiguration, index, term),
executor);
@@ -371,7 +371,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
}
});
- var tableProcessor = new TestRaftTableProcessor();
+ var tableProcessor = new TestTablePartitionRaftProcessor();
CompletableFuture<Void> f1 = runAsync(() ->
listener.onWrite(closure.iterator()), executor);
@@ -392,9 +392,9 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
@Test
void usesSnapshotInfoForRecovery(
- @Mock RaftTableProcessor tableProcessor1,
- @Mock RaftTableProcessor tableProcessor2,
- @Mock RaftTableProcessor tableProcessor3,
+ @Mock TablePartitionRaftProcessor tableProcessor1,
+ @Mock TablePartitionRaftProcessor tableProcessor2,
+ @Mock TablePartitionRaftProcessor tableProcessor3,
@Mock CommandClosure<WriteCommand> writeCommandClosure,
@Mock PrimaryReplicaChangeCommand command
) {
@@ -443,7 +443,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
}
@Test
- void processorsAreNotInitializedWithoutSnapshot(@Mock RaftTableProcessor
tableProcessor) {
+ void processorsAreNotInitializedWithoutSnapshot(@Mock
TablePartitionRaftProcessor tableProcessor) {
listener.addTableProcessorOnRecovery(42, tableProcessor);
verify(tableProcessor, never()).initialize(any(), any(), anyLong(),
anyLong());
@@ -453,7 +453,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
void testSkipWriteCommandByAppliedIndex() {
mvPartitionStorage = spy(new TestMvPartitionStorage(PARTITION_ID));
- TablePartitionProcessor tableProcessor = partitionListener(TABLE_ID);
+ DefaultTablePartitionRaftProcessor tableProcessor =
partitionListener(TABLE_ID);
listener.addTableProcessor(TABLE_ID, tableProcessor);
// Update(All)Command handling requires both information about raft
group topology and the primary replica,
@@ -757,7 +757,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
}
@Test
- void locksOnConfigCommit(@Mock RaftTableProcessor tableProcessor) {
+ void locksOnConfigCommit(@Mock TablePartitionRaftProcessor tableProcessor)
{
listener.addTableProcessor(TABLE_ID, tableProcessor);
long index = 10;
@@ -827,7 +827,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
@ParameterizedTest
@MethodSource("tableCommands")
- void locksOnApplicationOfTableCommands(WriteCommand command, @Mock
RaftTableProcessor tableProcessor) {
+ void locksOnApplicationOfTableCommands(WriteCommand command, @Mock
TablePartitionRaftProcessor tableProcessor) {
listener.addTableProcessor(TABLE_ID, tableProcessor);
when(tableProcessor.processCommand(any(), anyLong(), anyLong(), any()))
.thenReturn(EMPTY_APPLIED_RESULT);
@@ -951,7 +951,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
return writeCommandClosure(index, term, writeCommand, null, null);
}
- private TablePartitionProcessor partitionListener(int tableId) {
+ private DefaultTablePartitionRaftProcessor partitionListener(int tableId) {
LeasePlacementDriver placementDriver =
mock(LeasePlacementDriver.class);
lenient().when(placementDriver.getCurrentPrimaryReplica(any(),
any())).thenReturn(null);
@@ -979,7 +979,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
return null;
}).when(storageUpdateHandler).handleUpdate(any(), any(), any(), any(),
anyBoolean(), any(Runnable.class), any(), any(), any());
- return new TablePartitionProcessor(
+ return new DefaultTablePartitionRaftProcessor(
txManager,
new SnapshotAwarePartitionDataStorage(
tableId,
@@ -999,7 +999,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
);
}
- private static class TestRaftTableProcessor implements RaftTableProcessor {
+ private static class TestTablePartitionRaftProcessor implements
TablePartitionRaftProcessor {
@Nullable
private RaftGroupConfiguration raftGroupConfiguration;
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index edbe8090dfa..b37ea5bfb5e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -64,7 +64,7 @@ import
org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockManager;
@@ -175,7 +175,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
}
@Override
- protected PartitionReplicaListener newReplicaListener(
+ protected DefaultTablePartitionReplicaProcessor newReplicaListener(
MvPartitionStorage mvDataStorage,
RaftGroupService raftClient,
TxManager txManager,
@@ -198,7 +198,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
RemotelyTriggeredResourceRegistry resourcesRegistry,
SchemaRegistry schemaRegistry
) {
- return new PartitionReplicaListener(
+ return new DefaultTablePartitionReplicaProcessor(
mvDataStorage,
raftClient,
txManager,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
index 0593120259a..a541025d562 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
@@ -45,12 +45,12 @@ import
org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import
org.apache.ignite.internal.table.distributed.raft.DefaultTablePartitionRaftProcessor;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
@@ -67,8 +67,8 @@ import
org.apache.ignite.internal.util.PendingComparableValuesTracker;
*
* <p><b>Lifecycle ordering:</b> the caller must invoke {@link
StorageUpdateHandler#start} on the
* {@link PartitionResources#storageUpdateHandler} returned by {@link
#createPartitionResources} before
- * the constructed objects ({@link TablePartitionProcessor}, {@link
PartitionMvStorageAccess},
- * {@link PartitionReplicaListener}) are used at runtime.
+ * the constructed objects ({@link DefaultTablePartitionRaftProcessor}, {@link
PartitionMvStorageAccess},
+ * {@link DefaultTablePartitionReplicaProcessor}) are used at runtime.
*/
class TablePartitionResourcesFactory {
private final TxManager txManager;
@@ -202,7 +202,7 @@ class TablePartitionResourcesFactory {
}
/**
- * Creates a {@link TablePartitionProcessor} for the given partition.
+ * Creates a {@link DefaultTablePartitionRaftProcessor} for the given
partition.
*
* @param zonePartitionId Zone partition ID.
* @param table Table view.
@@ -210,13 +210,13 @@ class TablePartitionResourcesFactory {
* @param partitionResources Partition resources.
* @return Table partition processor.
*/
- TablePartitionProcessor createTablePartitionProcessor(
+ DefaultTablePartitionRaftProcessor createTablePartitionProcessor(
ZonePartitionId zonePartitionId,
TableViewInternal table,
PartitionDataStorage partitionDataStorage,
PartitionResources partitionResources
) {
- return new TablePartitionProcessor(
+ return new DefaultTablePartitionRaftProcessor(
txManager,
partitionDataStorage,
partitionResources.storageUpdateHandler,
@@ -257,7 +257,7 @@ class TablePartitionResourcesFactory {
}
/**
- * Creates a {@link PartitionReplicaListener} for the given partition.
+ * Creates a {@link DefaultTablePartitionReplicaProcessor} for the given
partition.
*
* @param replicationGroupId Zone partition ID used as the replication
group ID.
* @param table Table view.
@@ -268,7 +268,7 @@ class TablePartitionResourcesFactory {
* @param transactionStateResolver Transaction state resolver.
* @return Partition replica listener.
*/
- PartitionReplicaListener createReplicaListener(
+ DefaultTablePartitionReplicaProcessor createReplicaListener(
ZonePartitionId replicationGroupId,
TableViewInternal table,
PendingComparableValuesTracker<HybridTimestamp, Void>
safeTimeTracker,
@@ -279,7 +279,7 @@ class TablePartitionResourcesFactory {
) {
int partitionIndex = replicationGroupId.partitionId();
- return new PartitionReplicaListener(
+ return new DefaultTablePartitionReplicaProcessor(
mvPartitionStorage,
new ExecutorInclinedRaftCommandRunner(raftClient,
partitionOperationsExecutor),
txManager,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
index 44a4986c38e..05ba3566c96 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
@@ -75,8 +75,8 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import
org.apache.ignite.internal.table.distributed.raft.DefaultTablePartitionRaftProcessor;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -617,7 +617,7 @@ class TableZoneCoordinator {
transactionStateResolver
);
- TablePartitionProcessor tablePartitionProcessor =
partitionResourcesFactory.createTablePartitionProcessor(
+ DefaultTablePartitionRaftProcessor tablePartitionRaftProcessor =
partitionResourcesFactory.createTablePartitionProcessor(
zonePartitionId, table, partitionDataStorage,
partitionResources);
PartitionMvStorageAccess partitionStorageAccess =
partitionResourcesFactory.createPartitionMvStorageAccess(
@@ -627,7 +627,7 @@ class TableZoneCoordinator {
zonePartitionId,
tableId,
createListener,
- tablePartitionProcessor,
+ tablePartitionRaftProcessor,
partitionStorageAccess,
onNodeRecovery
);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/DefaultTablePartitionRaftProcessor.java
similarity index 98%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/DefaultTablePartitionRaftProcessor.java
index 4f8a0e7985c..8ce35e60748 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/DefaultTablePartitionRaftProcessor.java
@@ -44,7 +44,7 @@ import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCom
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
import org.apache.ignite.internal.partition.replicator.raft.CommandResult;
-import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
+import
org.apache.ignite.internal.partition.replicator.raft.TablePartitionRaftProcessor;
import
org.apache.ignite.internal.partition.replicator.raft.handlers.AbstractCommandHandler;
import
org.apache.ignite.internal.partition.replicator.raft.handlers.CommandHandlers;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
@@ -71,8 +71,8 @@ import org.jetbrains.annotations.TestOnly;
/**
* Partition command handler.
*/
-public class TablePartitionProcessor implements RaftTableProcessor {
- private static final IgniteLogger LOG =
Loggers.forClass(TablePartitionProcessor.class);
+public class DefaultTablePartitionRaftProcessor implements
TablePartitionRaftProcessor {
+ private static final IgniteLogger LOG =
Loggers.forClass(DefaultTablePartitionRaftProcessor.class);
/** Transaction manager. */
private final TxManager txManager;
@@ -104,7 +104,7 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
private ReplicaMeta lastKnownLease;
/** Constructor. */
- public TablePartitionProcessor(
+ public DefaultTablePartitionRaftProcessor(
TxManager txManager,
PartitionDataStorage partitionDataStorage,
StorageUpdateHandler storageUpdateHandler,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
index f5336c82260..6b2f5a59871 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
@@ -17,7 +17,7 @@
/**
* This package contains RAFT command handlers that is used by
- * {@link
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor} aka
table raft processor.
+ * {@link
org.apache.ignite.internal.table.distributed.raft.DefaultTablePartitionRaftProcessor}
aka table raft processor.
*/
package org.apache.ignite.internal.table.distributed.raft.handlers;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
similarity index 99%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
index f173d9ad228..0a2a5590efc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
@@ -111,9 +111,9 @@ import
org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.partition.replicator.FuturesCleanupResult;
import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
-import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import
org.apache.ignite.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
+import
org.apache.ignite.internal.partition.replicator.TablePartitionReplicaProcessor;
import
org.apache.ignite.internal.partition.replicator.exception.OperationLockException;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
@@ -228,7 +228,7 @@ import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/** Partition replication listener. */
-public class PartitionReplicaListener implements ReplicaTableProcessor {
+public class DefaultTablePartitionReplicaProcessor implements
TablePartitionReplicaProcessor {
/**
* NB: this listener makes writes to the underlying MV partition storage
without taking the partition snapshots read lock. This causes
* the RAFT snapshots transferred to a follower being slightly
inconsistent for a limited amount of time.
@@ -254,7 +254,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
private static final Object INTERNAL_DOC_PLACEHOLDER = null;
/** Logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(PartitionReplicaListener.class);
+ private static final IgniteLogger LOG =
Loggers.forClass(DefaultTablePartitionReplicaProcessor.class);
/** Factory to create RAFT command messages. */
private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
@@ -367,7 +367,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
* @param metrics Table metric source.
*/
@SuppressWarnings("PMD.UnusedFormalParameter") // clusterNodeResolver and
failureProcessor kept for API compatibility
- public PartitionReplicaListener(
+ public DefaultTablePartitionReplicaProcessor(
MvPartitionStorage mvDataStorage,
RaftCommandRunner raftCommandRunner,
TxManager txManager,
@@ -532,7 +532,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
UUID senderId
) {
return processRequest(request, replicaPrimacy)
-
.thenApply(PartitionReplicaListener::wrapInReplicaResultIfNeeded);
+
.thenApply(DefaultTablePartitionReplicaProcessor::wrapInReplicaResultIfNeeded);
}
private static ReplicaResult wrapInReplicaResultIfNeeded(Object res) {
@@ -2675,7 +2675,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return completedFuture(new
CommandApplicationResult(safeTs, null));
}
-
}).handle(PartitionReplicaListener::throwIfFullTxCommitSchemaValidationFailedDuringReplication);
+
}).handle(DefaultTablePartitionReplicaProcessor::throwIfFullTxCommitSchemaValidationFailedDuringReplication);
}
}
@@ -2808,7 +2808,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return completedFuture(new
CommandApplicationResult(safeTs, null));
}
-
}).handle(PartitionReplicaListener::throwIfFullTxCommitSchemaValidationFailedDuringReplication);
+
}).handle(DefaultTablePartitionReplicaProcessor::throwIfFullTxCommitSchemaValidationFailedDuringReplication);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/package-info.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/package-info.java
index ad7c7e9bffc..404684938db 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/package-info.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/package-info.java
@@ -17,6 +17,6 @@
/**
* This package contains replica request handlers that is used by
- * {@link
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener}.
+ * {@link
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor}.
*/
package org.apache.ignite.internal.table.distributed.replicator.handlers;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/DefaultTablePartitionRaftProcessorTest.java
similarity index 99%
rename from
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
rename to
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/DefaultTablePartitionRaftProcessorTest.java
index 9848f8e1a3e..c2ba446d5e1 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/DefaultTablePartitionRaftProcessorTest.java
@@ -143,7 +143,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
@ExtendWith(ConfigurationExtension.class)
@ExtendWith(ExecutorServiceExtension.class)
-public class PartitionCommandListenerTest extends BaseIgniteAbstractTest {
+public class DefaultTablePartitionRaftProcessorTest extends
BaseIgniteAbstractTest {
private static final int KEY_COUNT = 100;
private static final int TABLE_ID = 1;
@@ -160,7 +160,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private static final SchemaRegistry SCHEMA_REGISTRY = new
DummySchemaManagerImpl(SCHEMA);
- private TablePartitionProcessor commandListener;
+ private DefaultTablePartitionRaftProcessor commandListener;
private final AtomicLong raftIndex = new AtomicLong();
@@ -271,7 +271,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
return hybridClock.update(requestTime);
});
- commandListener = new TablePartitionProcessor(
+ commandListener = new DefaultTablePartitionRaftProcessor(
mock(TxManager.class),
partitionDataStorage,
storageUpdateHandler,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
similarity index 97%
rename from
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
rename to
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
index 0f935d29b4d..a5760940cb0 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
@@ -31,8 +31,8 @@ import static
org.apache.ignite.internal.partition.replicator.network.replicatio
import static
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_REPLACE_IF_EXIST;
import static
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT;
import static
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT_ALL;
-import static
org.apache.ignite.internal.table.distributed.replication.PartitionReplicaListenerTest.binaryRowsToBuffers;
-import static
org.apache.ignite.internal.table.distributed.replication.PartitionReplicaListenerTest.zonePartitionIdMessage;
+import static
org.apache.ignite.internal.table.distributed.replication.DefaultTablePartitionReplicaProcessorTest.binaryRowsToBuffers;
+import static
org.apache.ignite.internal.table.distributed.replication.DefaultTablePartitionReplicaProcessorTest.zonePartitionIdMessage;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -108,7 +108,7 @@ import
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
@@ -140,7 +140,7 @@ import org.junit.jupiter.params.provider.MethodSource;
/** There are tests for partition replica listener. */
@ExtendWith(ConfigurationExtension.class)
-public class PartitionReplicaListenerIndexLockingTest extends
IgniteAbstractTest {
+public class DefaultTablePartitionReplicaProcessorIndexLockingTest extends
IgniteAbstractTest {
private static final int PART_ID = 0;
private static final int TABLE_ID = 1;
private static final int PK_INDEX_ID = 1;
@@ -166,7 +166,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
private static SchemaDescriptor schemaDescriptor;
private static KvMarshaller<Integer, Integer> kvMarshaller;
private static Lazy<TableSchemaAwareIndexStorage> pkStorage;
- private static PartitionReplicaListener partitionReplicaListener;
+ private static DefaultTablePartitionReplicaProcessor
tablePartitionReplicaProcessor;
private static ColumnsExtractor row2HashKeyConverter;
private static ColumnsExtractor row2SortKeyConverter;
@@ -258,7 +258,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
InternalClusterNode localNode = DummyInternalTableImpl.LOCAL_NODE;
- partitionReplicaListener = new PartitionReplicaListener(
+ tablePartitionReplicaProcessor = new
DefaultTablePartitionReplicaProcessor(
TEST_MV_PARTITION_STORAGE,
mockRaftClient,
newTxManager(),
@@ -403,7 +403,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
throw new AssertionError("Unexpected operation type: " +
arg.type);
}
- CompletableFuture<?> fut = partitionReplicaListener.process(request,
replicaPrimacy(), LOCAL_NODE_ID);
+ CompletableFuture<?> fut =
tablePartitionReplicaProcessor.process(request, replicaPrimacy(),
LOCAL_NODE_ID);
await(fut);
@@ -496,7 +496,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
throw new AssertionError("Unexpected operation type: " +
arg.type);
}
- CompletableFuture<?> fut = partitionReplicaListener.process(request,
replicaPrimacy(), LOCAL_NODE_ID);
+ CompletableFuture<?> fut =
tablePartitionReplicaProcessor.process(request, replicaPrimacy(),
LOCAL_NODE_ID);
await(fut);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
similarity index 96%
rename from
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
rename to
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
index d4abcae3bb7..4c429dc7d2b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
@@ -31,9 +31,9 @@ import static
org.apache.ignite.internal.partition.replicator.network.replicatio
import static
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_REPLACE_IF_EXIST;
import static
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT;
import static
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT_ALL;
-import static
org.apache.ignite.internal.table.distributed.replication.PartitionReplicaListenerIndexLockingTest.LOCAL_NODE_ID;
-import static
org.apache.ignite.internal.table.distributed.replication.PartitionReplicaListenerTest.binaryRowsToBuffers;
-import static
org.apache.ignite.internal.table.distributed.replication.PartitionReplicaListenerTest.zonePartitionIdMessage;
+import static
org.apache.ignite.internal.table.distributed.replication.DefaultTablePartitionReplicaProcessorIndexLockingTest.LOCAL_NODE_ID;
+import static
org.apache.ignite.internal.table.distributed.replication.DefaultTablePartitionReplicaProcessorTest.binaryRowsToBuffers;
+import static
org.apache.ignite.internal.table.distributed.replication.DefaultTablePartitionReplicaProcessorTest.zonePartitionIdMessage;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -103,7 +103,7 @@ import
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
@@ -135,7 +135,7 @@ import org.junit.jupiter.params.provider.MethodSource;
/** There are tests for partition replica listener. */
@ExtendWith(ConfigurationExtension.class)
-public class PartitionReplicaListenerSortedIndexLockingTest extends
IgniteAbstractTest {
+public class DefaultTablePartitionReplicaProcessorSortedIndexLockingTest
extends IgniteAbstractTest {
private static final int PART_ID = 0;
private static final int TABLE_ID = 1;
private static final int PK_INDEX_ID = 1;
@@ -153,7 +153,7 @@ public class PartitionReplicaListenerSortedIndexLockingTest
extends IgniteAbstra
private static SchemaDescriptor schemaDescriptor;
private static KvMarshaller<Integer, Integer> kvMarshaller;
private static Lazy<TableSchemaAwareIndexStorage> pkStorage;
- private static PartitionReplicaListener partitionReplicaListener;
+ private static DefaultTablePartitionReplicaProcessor
tablePartitionReplicaProcessor;
private static ColumnsExtractor row2HashKeyConverter;
@InjectConfiguration
@@ -228,7 +228,7 @@ public class PartitionReplicaListenerSortedIndexLockingTest
extends IgniteAbstra
InternalClusterNode localNode = DummyInternalTableImpl.LOCAL_NODE;
- partitionReplicaListener = new PartitionReplicaListener(
+ tablePartitionReplicaProcessor = new
DefaultTablePartitionReplicaProcessor(
TEST_MV_PARTITION_STORAGE,
mockRaftClient,
newTxManager(),
@@ -370,7 +370,7 @@ public class PartitionReplicaListenerSortedIndexLockingTest
extends IgniteAbstra
throw new AssertionError("Unexpected operation type: " +
arg.type);
}
- CompletableFuture<?> fut = partitionReplicaListener.process(request,
validRwPrimacy(), LOCAL_NODE_ID);
+ CompletableFuture<?> fut =
tablePartitionReplicaProcessor.process(request, validRwPrimacy(),
LOCAL_NODE_ID);
await(fut);
@@ -449,7 +449,7 @@ public class PartitionReplicaListenerSortedIndexLockingTest
extends IgniteAbstra
throw new AssertionError("Unexpected operation type: " +
arg.type);
}
- CompletableFuture<?> fut = partitionReplicaListener.process(request,
validRwPrimacy(), LOCAL_NODE_ID);
+ CompletableFuture<?> fut =
tablePartitionReplicaProcessor.process(request, validRwPrimacy(),
LOCAL_NODE_ID);
await(fut);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
similarity index 98%
rename from
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
rename to
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
index a8e1047cd85..db7cdb543aa 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
@@ -220,7 +220,7 @@ import
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
@@ -289,7 +289,7 @@ import org.mockito.quality.Strictness;
@ExtendWith(MockitoExtension.class)
@ExtendWith(ConfigurationExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
-public class PartitionReplicaListenerTest extends IgniteAbstractTest {
+public class DefaultTablePartitionReplicaProcessorTest extends
IgniteAbstractTest {
private static final int PART_ID = 0;
private static final int CURRENT_SCHEMA_VERSION = 1;
@@ -461,7 +461,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private TestPlacementDriver placementDriver;
/** Partition replication listener to test. */
- private PartitionReplicaListener partitionReplicaListener;
+ private DefaultTablePartitionReplicaProcessor
tablePartitionReplicaProcessor;
private HashIndexStorage pkIndexStorage;
@@ -657,7 +657,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
placementDriver = spy(new TestPlacementDriver(localNode));
- partitionReplicaListener = new PartitionReplicaListener(
+ tablePartitionReplicaProcessor = new
DefaultTablePartitionReplicaProcessor(
testMvPartitionStorage,
mockRaftClient,
txManager,
@@ -1049,7 +1049,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
UUID scanTxId = newTxId();
// Request first batch
- CompletableFuture<ReplicaResult> fut =
partitionReplicaListener.process(
+ CompletableFuture<ReplicaResult> fut =
tablePartitionReplicaProcessor.process(
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
@@ -1067,7 +1067,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(4, rows.size());
// Request second batch
- fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
tablePartitionReplicaProcessor.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(scanTxId)
@@ -1084,7 +1084,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(2, rows.size());
// Request bounded.
- fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
tablePartitionReplicaProcessor.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -1104,7 +1104,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(2, rows.size());
// Empty result.
- fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
tablePartitionReplicaProcessor.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -1122,7 +1122,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(0, rows.size());
// Lookup.
- fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
tablePartitionReplicaProcessor.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -1162,7 +1162,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
UUID scanTxId = newTxId();
// Request first batch
- CompletableFuture<ReplicaResult> fut =
partitionReplicaListener.process(
+ CompletableFuture<ReplicaResult> fut =
tablePartitionReplicaProcessor.process(
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
@@ -1181,7 +1181,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(3, rows.size());
// Request second batch
- fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
tablePartitionReplicaProcessor.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(scanTxId)
@@ -1199,7 +1199,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(1, rows.size());
// Empty result.
- fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
tablePartitionReplicaProcessor.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -1217,7 +1217,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(0, rows.size());
// Lookup.
- fut =
partitionReplicaListener.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ fut =
tablePartitionReplicaProcessor.process(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(zonePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.transactionId(newTxId())
@@ -2426,7 +2426,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<BinaryRow> roGetAsync(BinaryRow row,
HybridTimestamp readTimestamp) {
ReadOnlySingleRowPkReplicaRequest message =
readOnlySingleRowPkReplicaRequest(row, readTimestamp);
- return partitionReplicaListener.process(message, validRoPrimacy(),
localNode.id())
+ return tablePartitionReplicaProcessor.process(message,
validRoPrimacy(), localNode.id())
.thenApply(replicaResult -> (BinaryRow)
replicaResult.result());
}
@@ -2490,7 +2490,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
txManager.updateTxMeta(txId, old -> new TxStateMeta(newTxState,
UUID.randomUUID(), commitPartitionId, commitTsOrNull, null, null));
lockManager.releaseAll(txId);
- partitionReplicaListener.cleanupLocally(txId, commit, commitTs);
+ tablePartitionReplicaProcessor.cleanupLocally(txId, commit, commitTs);
}
private BinaryTupleMessage toIndexBound(int val) {
@@ -3078,7 +3078,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<ReplicaResult> processWithPrimacy(ReplicaRequest
request) {
return primacyEngine.validatePrimacy(request)
- .thenCompose(primacy ->
partitionReplicaListener.process(request, primacy, localNode.id()));
+ .thenCompose(primacy ->
tablePartitionReplicaProcessor.process(request, primacy, localNode.id()));
}
private static class RequestContext {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index 3a7b1b7f4dc..202bf8cdf8e 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -33,7 +33,7 @@ import static
org.apache.ignite.internal.partition.replicator.network.replicatio
import static
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT_ALL;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
-import static
org.apache.ignite.internal.table.distributed.replication.PartitionReplicaListenerTest.zonePartitionIdMessage;
+import static
org.apache.ignite.internal.table.distributed.replication.DefaultTablePartitionReplicaProcessorTest.zonePartitionIdMessage;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
@@ -187,7 +187,7 @@ import
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
@@ -431,7 +431,7 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
private ZonePartitionReplicaListener zonePartitionReplicaListener;
/** Partition replication listener to test. */
- private PartitionReplicaListener tableReplicaProcessor;
+ private DefaultTablePartitionReplicaProcessor tableReplicaProcessor;
private HashIndexStorage pkIndexStorage;
@@ -662,7 +662,7 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
txRecoveryEngine
);
- tableReplicaProcessor = new PartitionReplicaListener(
+ tableReplicaProcessor = new DefaultTablePartitionReplicaProcessor(
testMvPartitionStorage,
mockRaftClient,
txManager,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index 1ca19e38f24..cb0754e9567 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -95,7 +95,7 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
@@ -227,7 +227,7 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
new TableMetricSource(QualifiedName.fromSimple(TABLE_NAME))
);
- List<PartitionReplicaListener> partitionReplicaListeners =
IntStream.range(0, PARTITIONS_NUM)
+ List<DefaultTablePartitionReplicaProcessor>
tablePartitionReplicaProcessors = IntStream.range(0, PARTITIONS_NUM)
.mapToObj(partId -> createPartitionReplicaListener(
partId,
txManager,
@@ -269,7 +269,9 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
return primacyEngines.get(zonePartitionId)
.validatePrimacy(request)
.thenCompose(
- primacy ->
partitionReplicaListeners.get(zonePartitionId.partitionId()).process(request,
primacy, node.id())
+ primacy -> tablePartitionReplicaProcessors.get(
+
zonePartitionId.partitionId()).process(request, primacy, node.id()
+ )
)
.thenApply(replicaResult -> new ReplicaMessagesFactory()
.replicaResponse()
@@ -288,7 +290,7 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
assertThat(stopAsync(componentContext, components),
willCompleteSuccessfully());
}
- private PartitionReplicaListener createPartitionReplicaListener(
+ private DefaultTablePartitionReplicaProcessor
createPartitionReplicaListener(
int partId,
TxManager txManager,
LockManager lockManager,
@@ -309,7 +311,7 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
partitionStorages.add(partitionStorage);
- return new PartitionReplicaListener(
+ return new DefaultTablePartitionReplicaProcessor(
partitionStorage,
new RaftCommandRunner() {
@Override
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index c20ec69e845..a19ca2b08eb 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -162,10 +162,10 @@ import
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import
org.apache.ignite.internal.table.distributed.raft.DefaultTablePartitionRaftProcessor;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
import
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
-import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -938,7 +938,7 @@ public class ItTxTestCluster {
)
);
- TablePartitionProcessor tablePartitionRaftListener = new
TablePartitionProcessor(
+ DefaultTablePartitionRaftProcessor tablePartitionRaftListener = new
DefaultTablePartitionRaftProcessor(
txManagers.get(assignment),
partitionDataStorage,
storageUpdateHandler,
@@ -1079,7 +1079,7 @@ public class ItTxTestCluster {
}
}
- protected PartitionReplicaListener newReplicaListener(
+ protected DefaultTablePartitionReplicaProcessor newReplicaListener(
MvPartitionStorage mvDataStorage,
RaftGroupService raftClient,
TxManager txManager,
@@ -1102,7 +1102,7 @@ public class ItTxTestCluster {
RemotelyTriggeredResourceRegistry resourcesRegistry,
SchemaRegistry schemaRegistry
) {
- return new PartitionReplicaListener(
+ return new DefaultTablePartitionReplicaProcessor(
mvDataStorage,
raftClient,
txManager,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index dee1239e313..a041517e9ab 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -85,7 +85,7 @@ import
org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
@@ -364,12 +364,16 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
throw new RuntimeException(e);
}
- PartitionReplicaListener listener =
IgniteTestUtils.getFieldValue(replica, ReplicaImpl.class, "listener");
- TestMvPartitionStorage storage =
IgniteTestUtils.getFieldValue(listener, PartitionReplicaListener.class,
"mvDataStorage");
+ DefaultTablePartitionReplicaProcessor listener =
IgniteTestUtils.getFieldValue(replica, ReplicaImpl.class, "listener");
+ TestMvPartitionStorage storage = IgniteTestUtils.getFieldValue(
+ listener,
+ DefaultTablePartitionReplicaProcessor.class,
+ "mvDataStorage"
+ );
Map<RowId, ?> map = IgniteTestUtils.getFieldValue(storage,
TestMvPartitionStorage.class, "map");
PendingComparableValuesTracker<HybridTimestamp, Void> safeTime =
- IgniteTestUtils.getFieldValue(listener,
PartitionReplicaListener.class, "safeTime");
+ IgniteTestUtils.getFieldValue(listener,
DefaultTablePartitionReplicaProcessor.class, "safeTime");
logger().info("Partition data "
+ "[node={}, groupId={}, data={}, lastAppliedIndex={},
lastAppliedTerm={}, leaseInfo={}, safeTime = {}]",
@@ -377,13 +381,17 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
safeTime.current());
Lazy<TableSchemaAwareIndexStorage> indexStorageLazy =
- IgniteTestUtils.getFieldValue(listener,
PartitionReplicaListener.class, "pkIndexStorage");
+ IgniteTestUtils.getFieldValue(listener,
DefaultTablePartitionReplicaProcessor.class, "pkIndexStorage");
IndexStorage indexStorage = indexStorageLazy.get().storage();
Map<RowId, ?> indexMap = IgniteTestUtils.getFieldValue(indexStorage,
TestHashIndexStorage.class, "index");
logger().info("Index data [node={}, groupId={}, data={}]", name,
replicationGroupId, indexMap);
- TxStatePartitionStorage stateStorage =
IgniteTestUtils.getFieldValue(listener, PartitionReplicaListener.class,
"txStateStorage");
+ TxStatePartitionStorage stateStorage = IgniteTestUtils.getFieldValue(
+ listener,
+ DefaultTablePartitionReplicaProcessor.class,
+ "txStateStorage"
+ );
logger().info("Tx state data [node={}, groupId={}, data={}]", name,
replicationGroupId, stateStorage);
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
index 24f957f0d36..67ef9283e68 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
@@ -54,7 +54,7 @@ import
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurat
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
+import
org.apache.ignite.internal.table.distributed.raft.DefaultTablePartitionRaftProcessor;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -301,7 +301,7 @@ public abstract class TxInfrastructureTest extends
IgniteAbstractTest {
var fsm = (JraftServerImpl.DelegatingStateMachine)
grp.getRaftNode().getOptions().getFsm();
- TablePartitionProcessor listener = (TablePartitionProcessor)
((ZonePartitionRaftListener) fsm.getListener())
+ var listener = (DefaultTablePartitionRaftProcessor)
((ZonePartitionRaftListener) fsm.getListener())
.tableProcessor(table.tableId());
MvPartitionStorage storage = listener.getMvStorage();
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 6bda271fa7f..56852c04886 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -125,9 +125,9 @@ import
org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import
org.apache.ignite.internal.table.distributed.raft.DefaultTablePartitionRaftProcessor;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
@@ -479,7 +479,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
var validationSchemasSource = new
DummyValidationSchemasSource(schemaManager);
var schemaSyncService = new AlwaysSyncedSchemaSyncService();
- var tableReplicaListener = new PartitionReplicaListener(
+ var tableReplicaListener = new DefaultTablePartitionReplicaProcessor(
mvPartStorage,
svc,
this.txManager,
@@ -551,7 +551,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
});
PendingComparableValuesTracker<Long, Void> storageIndexTracker = new
PendingComparableValuesTracker<>(0L);
- var tablePartitionListener = new TablePartitionProcessor(
+ var tablePartitionListener = new DefaultTablePartitionRaftProcessor(
this.txManager,
new TestPartitionDataStorage(tableId, PART_ID, mvPartStorage),
storageUpdateHandler,
diff --git
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
index 468168f7cdc..96a75115f8c 100644
---
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
+++
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
@@ -49,7 +49,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.table.RecordBinaryViewImpl;
import org.apache.ignite.internal.table.TableImpl;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.DefaultTablePartitionReplicaProcessor;
import org.apache.ignite.internal.tx.impl.IgniteAbstractTransactionImpl;
import org.apache.ignite.internal.wrapper.Wrappers;
import org.apache.ignite.table.RecordView;
@@ -277,13 +277,13 @@ public class ItTransactionTestUtils {
* @param tableId Table id.
* @return Partition replica listener.
*/
- public static PartitionReplicaListener partitionReplicaListener(IgniteImpl
node, ZonePartitionId groupId, int tableId) {
+ public static DefaultTablePartitionReplicaProcessor
partitionReplicaListener(IgniteImpl node, ZonePartitionId groupId, int tableId)
{
CompletableFuture<Replica> replicaFut =
node.replicaManager().replica(groupId);
assertThat(replicaFut, willSucceedFast());
Replica replica = replicaFut.join();
ZonePartitionReplicaListener listener = (ZonePartitionReplicaListener)
replica.listener();
- return (PartitionReplicaListener)
listener.tableReplicaProcessors().get(tableId);
+ return (DefaultTablePartitionReplicaProcessor)
listener.tableReplicaProcessors().get(tableId);
}
/**