ibessonov commented on code in PR #1490:
URL: https://github.com/apache/ignite-3/pull/1490#discussion_r1080883366


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -741,39 +742,27 @@ private void initDataNodesFromVaultManager() {
         }
 
         try {
-            // TODO: Remove this call as part of 
https://issues.apache.org/jira/browse/IGNITE-18397
-            vaultMgr.get(MetaStorageManagerImpl.APPLIED_REV)
-                    .thenApply(appliedRevision -> appliedRevision == null ? 0L 
: bytesToLong(appliedRevision.value()))
-                    .thenAccept(vaultAppliedRevision -> {
+            long appliedRevision = metaStorageManager.appliedRevision();
+
+            vaultMgr.get(zonesLogicalTopologyKey())

Review Comment:
   Any idea why Vault is asynchronous?



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Conditions;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for Meta Storage Watches.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+public class ItMetaStorageWatchTest {
+    private static class Node {
+        private final ClusterService clusterService;
+
+        private final RaftManager raftManager;
+
+        private final MetaStorageManager metaStorageManager;
+
+        private final CompletableFuture<Set<String>> metaStorageNodesFuture = 
new CompletableFuture<>();
+
+        Node(ClusterService clusterService, RaftConfiguration 
raftConfiguration, Path dataPath) {
+            this.clusterService = clusterService;
+
+            Path basePath = dataPath.resolve(name());
+
+            this.raftManager = new Loza(
+                    clusterService,
+                    raftConfiguration,
+                    basePath.resolve("raft"),
+                    new HybridClockImpl()
+            );
+
+            var vaultManager = mock(VaultManager.class);

Review Comment:
   Can we simply have a VaultManager with InMemoryVaultService inside? Or it's 
less convenient?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> 
initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = 
raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> 
svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently 
whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when 
they leave Logical Topology
+                                //  see 
https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in 
case we missed some updates.
+                                
addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void 
onNewPeersConfigurationApplied(PeersAndLearners configuration) {
+                            }
+
+                            @Override
+                            public void onReconfigurationError(Status status, 
PeersAndLearners configuration, long term) {
+                            }
+                        }
+                );
+            } else {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+
+                Peer localPeer = configuration.learner(thisNodeName);
+
+                assert localPeer != null;
+
+                raftServiceFuture = raftMgr.startRaftGroupNode(
+                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                        configuration,
+                        new MetaStorageLearnerListener(storage),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(service -> new 
MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+        return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(raftService, thisNode));
+    }
+
+    private void registerTopologyEventListener() {
+        clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                addLearners(List.of(member));
+            }
+
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                metaStorageSvcFut.thenAccept(service -> 
isCurrentNodeLeader(service.raftGroupService())
+                        .thenAccept(isLeader -> {
+                            if (isLeader) {
+                                service.closeCursors(member.id());
+                            }
+                        }));
+            }
+        });
+    }
+
+    private void addLearners(Collection<ClusterNode> nodes) {
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping Meta Storage configuration update because the 
node is stopping");
+
+            return;
+        }
+
+        try {
+            metaStorageSvcFut
+                    .thenApply(MetaStorageServiceImpl::raftGroupService)
+                    .thenCompose(raftService -> 
isCurrentNodeLeader(raftService)
+                            .thenCompose(isLeader -> {
+                                if (!isLeader) {
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+
+                                if (!busyLock.enterBusy()) {
+                                    LOG.info("Skipping Meta Storage 
configuration update because the node is stopping");
+
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+
+                                try {
+                                    Set<String> peers = 
raftService.peers().stream()
+                                            .map(Peer::consistentId)
+                                            .collect(toUnmodifiableSet());
+
+                                    Set<String> learners = nodes.stream()
+                                            .map(ClusterNode::name)
+                                            .filter(name -> 
!peers.contains(name))
+                                            .collect(toUnmodifiableSet());
+
+                                    LOG.info("New Meta Storage learners 
detected: " + learners);
+
+                                    if (learners.isEmpty()) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+
+                                    PeersAndLearners newConfiguration = 
PeersAndLearners.fromConsistentIds(peers, learners);
+
+                                    return 
raftService.addLearners(newConfiguration.learners());
+                                } finally {
+                                    busyLock.leaveBusy();
+                                }
+                            })
+                    )
+                    .whenComplete((v, e) -> {
+                        if (e != null) {
+                            LOG.error("Unable to change peers on topology 
update", e);
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Boolean> isCurrentNodeLeader(RaftGroupService 
raftService) {
+        String name = clusterService.topologyService().localMember().name();
+
+        return raftService.refreshLeader()

Review Comment:
   What if this method returned a future with the leader peer? :thinking: 



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> 
initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = 
raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> 
svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently 
whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when 
they leave Logical Topology
+                                //  see 
https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in 
case we missed some updates.
+                                
addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void 
onNewPeersConfigurationApplied(PeersAndLearners configuration) {
+                            }
+
+                            @Override
+                            public void onReconfigurationError(Status status, 
PeersAndLearners configuration, long term) {
+                            }
+                        }
+                );
+            } else {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+
+                Peer localPeer = configuration.learner(thisNodeName);
+
+                assert localPeer != null;
+
+                raftServiceFuture = raftMgr.startRaftGroupNode(
+                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                        configuration,
+                        new MetaStorageLearnerListener(storage),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(service -> new 
MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+        return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(raftService, thisNode));
+    }
+
+    private void registerTopologyEventListener() {
+        clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                addLearners(List.of(member));
+            }
+
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                metaStorageSvcFut.thenAccept(service -> 
isCurrentNodeLeader(service.raftGroupService())

Review Comment:
   I wonder if there's a race possible here if disappeared node appears back 
too fast



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -842,11 +753,9 @@ public boolean hasNext() {
             }
 
             try {
-                try {
-                    return innerIterFut.thenApply(Iterator::hasNext).get();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new MetaStorageException(CURSOR_EXECUTION_ERR, e);
-                }
+                return innerCursorFut.thenApply(Iterator::hasNext).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new MetaStorageException(CURSOR_EXECUTION_ERR, e);

Review Comment:
   Some catches contain interruptions, other don't. Any idea why?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java:
##########
@@ -220,32 +222,54 @@ public interface KeyValueStorage extends 
ManuallyCloseable {
      * @param keyFrom Start key of range (inclusive).
      * @param keyTo   Last key of range (exclusive).
      * @param rev     Start revision number.
-     * @return Cursor by update events.
      */
-    Cursor<WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long 
rev);
+    void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev, 
WatchListener listener);
 
     /**
-     * Creates subscription on updates of entries corresponding to the given 
keys range (where upper bound is unlimited) and starting from
-     * the given revision number.
+     * Registers a watch listener by a key prefix.
      *
-     * @param key Start key of range (inclusive).
-     * @param rev Start revision number.
-     * @return Cursor by update events.
+     * @param prefix Prefix to listen to.
+     * @param rev Starting Meta Storage revision.
+     * @param listener Listener which will be notified for each update.
      */
-    Cursor<WatchEvent> watch(byte[] key, long rev);
+    void watchPrefix(byte[] prefix, long rev, WatchListener listener);
 
     /**
-     * Creates subscription on updates of entries corresponding to the given 
keys collection and starting from the given revision number.
+     * Registers a watch listener for the provided key.
      *
-     * @param keys Collection of keys
-     * @param rev  Start revision number.
-     * @return Cursor by update events.
+     * @param key Meta Storage key.
+     * @param rev Starting Meta Storage revision.
+     * @param listener Listener which will be notified for each update.
      */
-    Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev);
+    void watchExact(byte[] key, long rev, WatchListener listener);
+
+    /**
+     * Registers a watch listener for the provided keys.
+     *
+     * @param keys Meta Storage keys.
+     * @param rev Starting Meta Storage revision.
+     * @param listener Listener which will be notified for each update.
+     */
+    void watchExact(Collection<byte[]> keys, long rev, WatchListener listener);
+
+    /**
+     * Starts all registered watches.
+     *
+     * <p>Before calling this method, watches will not receive any updates.
+     *
+     * @param revisionCallback Callback that will be invoked after all watches 
of a particular revision are processed, with the revision
+     *      as its argument.
+     */
+    void startWatches(LongConsumer revisionCallback);
+
+    /**
+     * Unregisters a watch listener.
+     */
+    void removeWatch(WatchListener listener);
 
     /**
      * Compacts storage (removes tombstones).
-     * TODO: IGNITE-16444 Сorrect compaction for Metastorage.
+     * TODO: IGNITE-16444 Correct compaction for Metastorage.

Review Comment:
   What exactly has changed? Did it have a Cyrillic letter?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupListener;
+
+/**
+ * Meta Storage Raft group listener for learner nodes. These nodes ignore read 
and cursor-related commands.
+ */
+public class MetaStorageLearnerListener implements RaftGroupListener {
+    private final KeyValueStorage storage;
+
+    private final MetaStorageWriteHandler writeHandler;
+
+    public MetaStorageLearnerListener(KeyValueStorage storage) {
+        this.storage = storage;
+        this.writeHandler = new MetaStorageWriteHandler(storage);
+    }
+
+    @Override
+    public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+        throw new UnsupportedOperationException("Reads should not be sent to 
learners");
+    }
+
+    @Override
+    public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
+        while (iter.hasNext()) {
+            CommandClosure<WriteCommand> clo = iter.next();
+
+            if (!writeHandler.handleWriteCommand(clo)) {
+                // Ignore all commands that are not handled by the 
writeHandler.
+                clo.result(null);

Review Comment:
   Is this safe? What if we need to handle next command in the collection, but 
you already completed the closure?
   Who would complete the closure when you were able to handle every command 
from the collection?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.command.InvokeCommand;
+import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.command.PutCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
+import 
org.apache.ignite.internal.metastorage.command.info.CompoundConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.ConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.IfInfo;
+import org.apache.ignite.internal.metastorage.command.info.OperationInfo;
+import org.apache.ignite.internal.metastorage.command.info.SimpleConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.StatementInfo;
+import org.apache.ignite.internal.metastorage.command.info.UpdateInfo;
+import org.apache.ignite.internal.metastorage.dsl.CompoundConditionType;
+import org.apache.ignite.internal.metastorage.dsl.ConditionType;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.metastorage.server.AndCondition;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.If;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.OrCondition;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.Statement;
+import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+
+/**
+ * Class containing some common logic for Meta Storage Raft group listeners.
+ */
+class MetaStorageWriteHandler {
+    private final MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
+
+    private final KeyValueStorage storage;
+
+    MetaStorageWriteHandler(KeyValueStorage storage) {
+        this.storage = storage;
+    }
+
+    /**
+     * Tries to process a {@link WriteCommand}, returning {@code true} if the 
command has been successfully processed or {@code false}
+     * if the command requires external processing.
+     */
+    boolean handleWriteCommand(CommandClosure<WriteCommand> clo) {
+        WriteCommand command = clo.command();
+
+        if (command instanceof PutCommand) {

Review Comment:
   Another unrelated thought - why do we have to have a spaghetti-code in every 
raft listener? There should be an individual handler class for every command. I 
do realize that you simply copied it, so don't take it personal. It feels like 
Ignite developers don't give a damn about SOLID.



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Conditions;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for Meta Storage Watches.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+public class ItMetaStorageWatchTest {
+    private static class Node {
+        private final ClusterService clusterService;
+
+        private final RaftManager raftManager;
+
+        private final MetaStorageManager metaStorageManager;
+
+        private final CompletableFuture<Set<String>> metaStorageNodesFuture = 
new CompletableFuture<>();
+
+        Node(ClusterService clusterService, RaftConfiguration 
raftConfiguration, Path dataPath) {
+            this.clusterService = clusterService;
+
+            Path basePath = dataPath.resolve(name());
+
+            this.raftManager = new Loza(
+                    clusterService,
+                    raftConfiguration,
+                    basePath.resolve("raft"),
+                    new HybridClockImpl()
+            );
+
+            var vaultManager = mock(VaultManager.class);
+
+            
when(vaultManager.get(any())).thenReturn(CompletableFuture.completedFuture(null));
+            when(vaultManager.put(any(), 
any())).thenReturn(CompletableFuture.completedFuture(null));
+
+            var cmgManager = mock(ClusterManagementGroupManager.class);
+
+            
when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFuture);
+
+            this.metaStorageManager = new MetaStorageManagerImpl(
+                    vaultManager,
+                    clusterService,
+                    cmgManager,
+                    raftManager,
+                    new RocksDbKeyValueStorage(name(), 
basePath.resolve("storage"))
+            );
+        }
+
+        void start(Set<String> metaStorageNodes) {
+            clusterService.start();
+            raftManager.start();
+            metaStorageManager.start();
+
+            metaStorageNodesFuture.complete(metaStorageNodes);
+        }
+
+        String name() {
+            return clusterService.localConfiguration().getName();
+        }
+
+        void stop() throws Exception {
+            Stream<AutoCloseable> beforeNodeStop = 
Stream.of(metaStorageManager, raftManager, clusterService).map(c -> 
c::beforeNodeStop);
+
+            Stream<AutoCloseable> nodeStop = Stream.of(metaStorageManager, 
raftManager, clusterService).map(c -> c::stop);
+
+            IgniteUtils.closeAll(Stream.concat(beforeNodeStop, nodeStop));
+        }
+    }
+
+    private TestInfo testInfo;
+
+    @WorkDirectory
+    private Path workDir;
+
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
+    private final List<Node> nodes = new ArrayList<>();
+
+    /**
+     * Run {@code NODES} cluster nodes.

Review Comment:
   Same here, please move it to "startNodes" or simply remove it



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java:
##########
@@ -213,29 +165,118 @@ public class ItMetaStorageServiceTest {
         EXPECTED_SRV_RESULT_COLL = List.of(entry1, entry2);
     }
 
+    private static class Node {
+        private final ClusterService clusterService;
+
+        private final RaftManager raftManager;
+
+        private final KeyValueStorage mockStorage;
+
+        private RaftGroupService metaStorageRaftService;
+
+        private MetaStorageService metaStorageService;
+
+        Node(ClusterService clusterService, RaftConfiguration 
raftConfiguration, Path dataPath) {
+            this.clusterService = clusterService;
+
+            this.raftManager = new Loza(
+                    clusterService,
+                    raftConfiguration,
+                    dataPath.resolve(name()),
+                    new HybridClockImpl()
+            );
+
+            this.mockStorage = mock(KeyValueStorage.class);
+        }
+
+        void start(PeersAndLearners configuration) {
+            clusterService.start();
+            raftManager.start();
+
+            CompletableFuture<RaftGroupService> raftService = 
startRaftService(configuration);
+
+            assertThat(raftService, willCompleteSuccessfully());
+
+            metaStorageRaftService = raftService.join();
+
+            ClusterNode node = clusterService.topologyService().localMember();
+
+            metaStorageService = new 
MetaStorageServiceImpl(metaStorageRaftService, node);
+        }
+
+        String name() {
+            return clusterService.localConfiguration().getName();
+        }
+
+        private CompletableFuture<RaftGroupService> 
startRaftService(PeersAndLearners configuration) {
+            String name = name();
+
+            boolean isLearner = configuration.peer(name) == null;
+
+            Peer peer = isLearner ? configuration.learner(name) : 
configuration.peer(name);
+
+            assert peer != null;
+
+            RaftGroupListener listener = isLearner ? new 
MetaStorageLearnerListener(mockStorage) : new MetaStorageListener(mockStorage);
+
+            var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer);
+
+            try {
+                return raftManager.startRaftGroupNode(raftNodeId, 
configuration, listener, RaftGroupEventsListener.noopLsnr);
+            } catch (NodeStoppingException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        void stop() throws Exception {
+            Stream<AutoCloseable> raftStop = Stream.of(
+                    metaStorageRaftService == null ? null : (AutoCloseable) 
metaStorageRaftService::shutdown,
+                    () -> 
raftManager.stopRaftNodes(MetastorageGroupId.INSTANCE)
+            );
+
+            Stream<AutoCloseable> beforeNodeStop = Stream.of(raftManager, 
clusterService).map(c -> c::beforeNodeStop);
+
+            Stream<AutoCloseable> nodeStop = Stream.of(raftManager, 
clusterService).map(c -> c::stop);
+
+            IgniteUtils.closeAll(Stream.of(raftStop, beforeNodeStop, 
nodeStop).flatMap(Function.identity()));
+        }
+    }
+
+    private TestInfo testInfo;
+
+    @WorkDirectory
+    private Path workDir;
+
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
+    private final List<Node> nodes = new ArrayList<>();
+
     /**

Review Comment:
   Comment is misplaced



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> 
initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = 
raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> 
svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently 
whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when 
they leave Logical Topology
+                                //  see 
https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in 
case we missed some updates.
+                                
addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void 
onNewPeersConfigurationApplied(PeersAndLearners configuration) {
+                            }
+
+                            @Override
+                            public void onReconfigurationError(Status status, 
PeersAndLearners configuration, long term) {
+                            }
+                        }
+                );
+            } else {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+
+                Peer localPeer = configuration.learner(thisNodeName);
+
+                assert localPeer != null;
+
+                raftServiceFuture = raftMgr.startRaftGroupNode(
+                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                        configuration,
+                        new MetaStorageLearnerListener(storage),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(service -> new 
MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+        return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(raftService, thisNode));
+    }
+
+    private void registerTopologyEventListener() {
+        clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                addLearners(List.of(member));
+            }
+
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                metaStorageSvcFut.thenAccept(service -> 
isCurrentNodeLeader(service.raftGroupService())
+                        .thenAccept(isLeader -> {
+                            if (isLeader) {
+                                service.closeCursors(member.id());
+                            }
+                        }));
+            }
+        });
+    }
+
+    private void addLearners(Collection<ClusterNode> nodes) {
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping Meta Storage configuration update because the 
node is stopping");
+
+            return;
+        }
+
+        try {
+            metaStorageSvcFut
+                    .thenApply(MetaStorageServiceImpl::raftGroupService)
+                    .thenCompose(raftService -> 
isCurrentNodeLeader(raftService)

Review Comment:
   I think that code would be better if you extracted this lambda into a 
method, but it's up to you



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> 
initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = 
raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> 
svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently 
whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when 
they leave Logical Topology
+                                //  see 
https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in 
case we missed some updates.
+                                
addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void 
onNewPeersConfigurationApplied(PeersAndLearners configuration) {

Review Comment:
   Would be nice to have these as default implementations in the interface



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> 
initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = 
raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> 
svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently 
whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when 
they leave Logical Topology
+                                //  see 
https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in 
case we missed some updates.
+                                
addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void 
onNewPeersConfigurationApplied(PeersAndLearners configuration) {
+                            }
+
+                            @Override
+                            public void onReconfigurationError(Status status, 
PeersAndLearners configuration, long term) {
+                            }
+                        }
+                );
+            } else {
+                PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+
+                Peer localPeer = configuration.learner(thisNodeName);
+
+                assert localPeer != null;
+
+                raftServiceFuture = raftMgr.startRaftGroupNode(
+                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                        configuration,
+                        new MetaStorageLearnerListener(storage),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(service -> new 
MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+        return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(raftService, thisNode));
+    }
+
+    private void registerTopologyEventListener() {
+        clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                addLearners(List.of(member));
+            }
+
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                metaStorageSvcFut.thenAccept(service -> 
isCurrentNodeLeader(service.raftGroupService())
+                        .thenAccept(isLeader -> {
+                            if (isLeader) {
+                                service.closeCursors(member.id());
+                            }
+                        }));
+            }
+        });
+    }
+
+    private void addLearners(Collection<ClusterNode> nodes) {
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping Meta Storage configuration update because the 
node is stopping");
+
+            return;
+        }
+
+        try {
+            metaStorageSvcFut
+                    .thenApply(MetaStorageServiceImpl::raftGroupService)
+                    .thenCompose(raftService -> 
isCurrentNodeLeader(raftService)
+                            .thenCompose(isLeader -> {
+                                if (!isLeader) {
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+
+                                if (!busyLock.enterBusy()) {
+                                    LOG.info("Skipping Meta Storage 
configuration update because the node is stopping");
+
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+
+                                try {
+                                    Set<String> peers = 
raftService.peers().stream()
+                                            .map(Peer::consistentId)
+                                            .collect(toUnmodifiableSet());
+
+                                    Set<String> learners = nodes.stream()
+                                            .map(ClusterNode::name)
+                                            .filter(name -> 
!peers.contains(name))
+                                            .collect(toUnmodifiableSet());
+
+                                    LOG.info("New Meta Storage learners 
detected: " + learners);
+
+                                    if (learners.isEmpty()) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }

Review Comment:
   Should we swap there two statements?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.command.InvokeCommand;
+import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.command.PutCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
+import 
org.apache.ignite.internal.metastorage.command.info.CompoundConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.ConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.IfInfo;
+import org.apache.ignite.internal.metastorage.command.info.OperationInfo;
+import org.apache.ignite.internal.metastorage.command.info.SimpleConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.StatementInfo;
+import org.apache.ignite.internal.metastorage.command.info.UpdateInfo;
+import org.apache.ignite.internal.metastorage.dsl.CompoundConditionType;
+import org.apache.ignite.internal.metastorage.dsl.ConditionType;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.metastorage.server.AndCondition;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.If;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.OrCondition;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.Statement;
+import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+
+/**
+ * Class containing some common logic for Meta Storage Raft group listeners.
+ */
+class MetaStorageWriteHandler {
+    private final MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
+
+    private final KeyValueStorage storage;
+
+    MetaStorageWriteHandler(KeyValueStorage storage) {
+        this.storage = storage;
+    }
+
+    /**
+     * Tries to process a {@link WriteCommand}, returning {@code true} if the 
command has been successfully processed or {@code false}
+     * if the command requires external processing.
+     */
+    boolean handleWriteCommand(CommandClosure<WriteCommand> clo) {
+        WriteCommand command = clo.command();
+
+        if (command instanceof PutCommand) {
+            PutCommand putCmd = (PutCommand) command;
+
+            storage.put(putCmd.key(), putCmd.value());
+
+            clo.result(null);
+        } else if (command instanceof GetAndPutCommand) {
+            GetAndPutCommand getAndPutCmd = (GetAndPutCommand) command;
+
+            Entry e = storage.getAndPut(getAndPutCmd.key(), 
getAndPutCmd.value());
+
+            clo.result(new SingleEntryResponse(e.key(), e.value(), 
e.revision(), e.updateCounter()));
+        } else if (command instanceof PutAllCommand) {
+            PutAllCommand putAllCmd = (PutAllCommand) command;
+
+            storage.putAll(putAllCmd.keys(), putAllCmd.values());
+
+            clo.result(null);
+        } else if (command instanceof GetAndPutAllCommand) {
+            GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand) 
command;
+
+            Collection<Entry> entries = 
storage.getAndPutAll(getAndPutAllCmd.keys(), getAndPutAllCmd.values());
+
+            List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+            for (Entry e : entries) {
+                resp.add(new SingleEntryResponse(e.key(), e.value(), 
e.revision(), e.updateCounter()));
+            }
+
+            clo.result(new MultipleEntryResponse(resp));
+        } else if (command instanceof RemoveCommand) {
+            RemoveCommand rmvCmd = (RemoveCommand) command;
+
+            storage.remove(rmvCmd.key());
+
+            clo.result(null);
+        } else if (command instanceof GetAndRemoveCommand) {
+            GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand) command;
+
+            Entry e = storage.getAndRemove(getAndRmvCmd.key());
+
+            clo.result(new SingleEntryResponse(e.key(), e.value(), 
e.revision(), e.updateCounter()));
+        } else if (command instanceof RemoveAllCommand) {
+            RemoveAllCommand rmvAllCmd = (RemoveAllCommand) command;
+
+            storage.removeAll(rmvAllCmd.keys());
+
+            clo.result(null);
+        } else if (command instanceof GetAndRemoveAllCommand) {
+            GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand) 
command;
+
+            Collection<Entry> entries = 
storage.getAndRemoveAll(getAndRmvAllCmd.keys());
+
+            List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+            for (Entry e : entries) {
+                resp.add(new SingleEntryResponse(e.key(), e.value(), 
e.revision(), e.updateCounter()));
+            }
+
+            clo.result(new MultipleEntryResponse(resp));
+        } else if (command instanceof InvokeCommand) {
+            InvokeCommand cmd = (InvokeCommand) command;
+
+            boolean res = storage.invoke(
+                    toCondition(cmd.condition()),
+                    toOperations(cmd.success()),
+                    toOperations(cmd.failure())
+            );
+
+            clo.result(res);
+        } else if (command instanceof MultiInvokeCommand) {
+            MultiInvokeCommand cmd = (MultiInvokeCommand) command;
+
+            StatementResult res = storage.invoke(toIf(cmd.iif()));
+
+            
clo.result(commandsFactory.statementResultInfo().result(res.bytes()).build());
+        } else {
+            return false;
+        }
+
+        return true;
+    }
+
+    private static If toIf(IfInfo iif) {
+        return new If(toCondition(iif.cond()), 
toConditionBranch(iif.andThen()), toConditionBranch(iif.orElse()));
+    }
+
+    private static Update toUpdate(UpdateInfo updateInfo) {
+        return new Update(toOperations(new 
ArrayList<>(updateInfo.operations())), new 
StatementResult(updateInfo.result().result()));
+    }
+
+    private static Statement toConditionBranch(StatementInfo statementInfo) {
+        if (statementInfo.isTerminal()) {
+            return new Statement(toUpdate(statementInfo.update()));
+        } else {
+            return new Statement(toIf(statementInfo.iif()));
+        }
+    }
+
+    private static Condition toCondition(ConditionInfo info) {
+        if (info instanceof SimpleConditionInfo) {
+            SimpleConditionInfo inf = (SimpleConditionInfo) info;
+            byte[] key = inf.key();
+
+            ConditionType type = inf.type();
+
+            if (type == ConditionType.KEY_EXISTS) {

Review Comment:
   This should have been a switch :(



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.command.InvokeCommand;
+import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.command.PutCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
+import 
org.apache.ignite.internal.metastorage.command.info.CompoundConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.ConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.IfInfo;
+import org.apache.ignite.internal.metastorage.command.info.OperationInfo;
+import org.apache.ignite.internal.metastorage.command.info.SimpleConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.StatementInfo;
+import org.apache.ignite.internal.metastorage.command.info.UpdateInfo;
+import org.apache.ignite.internal.metastorage.dsl.CompoundConditionType;
+import org.apache.ignite.internal.metastorage.dsl.ConditionType;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.metastorage.server.AndCondition;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.If;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.OrCondition;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.Statement;
+import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+
+/**
+ * Class containing some common logic for Meta Storage Raft group listeners.
+ */
+class MetaStorageWriteHandler {
+    private final MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
+
+    private final KeyValueStorage storage;
+
+    MetaStorageWriteHandler(KeyValueStorage storage) {
+        this.storage = storage;
+    }
+
+    /**
+     * Tries to process a {@link WriteCommand}, returning {@code true} if the 
command has been successfully processed or {@code false}
+     * if the command requires external processing.
+     */
+    boolean handleWriteCommand(CommandClosure<WriteCommand> clo) {
+        WriteCommand command = clo.command();
+
+        if (command instanceof PutCommand) {
+            PutCommand putCmd = (PutCommand) command;
+
+            storage.put(putCmd.key(), putCmd.value());
+
+            clo.result(null);
+        } else if (command instanceof GetAndPutCommand) {
+            GetAndPutCommand getAndPutCmd = (GetAndPutCommand) command;
+
+            Entry e = storage.getAndPut(getAndPutCmd.key(), 
getAndPutCmd.value());
+
+            clo.result(new SingleEntryResponse(e.key(), e.value(), 
e.revision(), e.updateCounter()));
+        } else if (command instanceof PutAllCommand) {
+            PutAllCommand putAllCmd = (PutAllCommand) command;
+
+            storage.putAll(putAllCmd.keys(), putAllCmd.values());
+
+            clo.result(null);
+        } else if (command instanceof GetAndPutAllCommand) {
+            GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand) 
command;
+
+            Collection<Entry> entries = 
storage.getAndPutAll(getAndPutAllCmd.keys(), getAndPutAllCmd.values());
+
+            List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+            for (Entry e : entries) {
+                resp.add(new SingleEntryResponse(e.key(), e.value(), 
e.revision(), e.updateCounter()));
+            }
+
+            clo.result(new MultipleEntryResponse(resp));
+        } else if (command instanceof RemoveCommand) {
+            RemoveCommand rmvCmd = (RemoveCommand) command;
+
+            storage.remove(rmvCmd.key());
+
+            clo.result(null);
+        } else if (command instanceof GetAndRemoveCommand) {
+            GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand) command;
+
+            Entry e = storage.getAndRemove(getAndRmvCmd.key());
+
+            clo.result(new SingleEntryResponse(e.key(), e.value(), 
e.revision(), e.updateCounter()));
+        } else if (command instanceof RemoveAllCommand) {
+            RemoveAllCommand rmvAllCmd = (RemoveAllCommand) command;
+
+            storage.removeAll(rmvAllCmd.keys());
+
+            clo.result(null);
+        } else if (command instanceof GetAndRemoveAllCommand) {
+            GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand) 
command;
+
+            Collection<Entry> entries = 
storage.getAndRemoveAll(getAndRmvAllCmd.keys());
+
+            List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+            for (Entry e : entries) {
+                resp.add(new SingleEntryResponse(e.key(), e.value(), 
e.revision(), e.updateCounter()));
+            }
+
+            clo.result(new MultipleEntryResponse(resp));

Review Comment:
   I think we should only pass the value here if it's the leader. Any thoughts?
   I'm not asking to fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to