This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ea9d9e82a9 IGNITE-20300 Metastorage command reordering wrt Safe Time 
on Raft Group entry (#2513)
ea9d9e82a9 is described below

commit ea9d9e82a93cd11675acb50513f9efb5f98fbe07
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Wed Aug 30 14:28:41 2023 +0400

    IGNITE-20300 Metastorage command reordering wrt Safe Time on Raft Group 
entry (#2513)
---
 .../command/MetaStorageWriteCommand.java           |  4 +-
 .../server/raft/MetaStorageListener.java           |  3 +-
 .../server/raft/MetaStorageWriteHandler.java       |  5 +--
 .../impl/StandaloneMetaStorageManager.java         |  5 ++-
 .../internal/raft/service/BeforeApplyHandler.java  | 43 ++++++++++++++++++++++
 .../internal/raft/service/RaftGroupListener.java   | 12 ------
 .../jraft/rpc/impl/ActionRequestProcessor.java     | 38 ++++++++++++++++---
 7 files changed, 85 insertions(+), 25 deletions(-)

diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
index 752713d331..7f240ffea2 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
 import org.apache.ignite.network.annotations.WithSetter;
 
 /** Base meta storage write command. */
@@ -44,7 +44,7 @@ public interface MetaStorageWriteCommand extends WriteCommand 
{
      * This is a dirty hack. This time is set by the leader node to 
disseminate new safe time across
      * followers and learners. Leader of the ms group reads {@link 
#initiatorTime()}, adjusts its clock
      * and sets safeTime as {@link HybridClock#now()} as safeTime here. This 
must be done before
-     * command is saved into the Raft log (see {@link 
RaftGroupListener#onBeforeApply(Command)}.
+     * command is saved into the Raft log (see {@link 
BeforeApplyHandler#onBeforeApply(Command)}.
      */
     @WithSetter
     long safeTimeLong();
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index ab7db8bfbc..d3b5043d97 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.util.Cursor;
@@ -48,7 +49,7 @@ import org.jetbrains.annotations.Nullable;
  * Meta storage listener.
  * TODO: IGNITE-14693 Implement Meta storage exception handling logic.
  */
-public class MetaStorageListener implements RaftGroupListener {
+public class MetaStorageListener implements RaftGroupListener, 
BeforeApplyHandler {
     private final MetaStorageWriteHandler writeHandler;
 
     /** Storage. */
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index a8f98b58ba..6d06f8aa4e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -291,10 +291,7 @@ public class MetaStorageWriteHandler {
         }
     }
 
-    // TODO: IGNITE-20290 - This is insufficient, we must do this in single 
thread before saving the command to the RAFT log.
-    // Synchronized to make sure no reodering happens as 
RaftGroupListener#beforeApply() might be invoked in different threads
-    // for different commands.
-    synchronized void beforeApply(Command command) {
+    void beforeApply(Command command) {
         if (command instanceof MetaStorageWriteCommand) {
             // Initiator sends us a timestamp to adjust to.
             // Alter command by setting safe time based on the adjusted clock.
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 2801ff77cf..a36d3c9467 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.vault.VaultManager;
@@ -164,7 +165,9 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
             Command command = invocation.getArgument(0);
             RaftGroupListener listener = listenerCaptor.getValue();
 
-            listener.onBeforeApply(command);
+            if (listener instanceof BeforeApplyHandler) {
+                ((BeforeApplyHandler) listener).onBeforeApply(command);
+            }
 
             return runCommand(command, listener);
         });
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java
new file mode 100644
index 0000000000..c8fd7ca1b4
--- /dev/null
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.service;
+
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.WriteCommand;
+
+/**
+ * Handles 'before apply': that is, before a RAFT command is accepted by a 
RAFT leader for processing,
+ * executes some customization on the command.
+ *
+ * <p>For {@link WriteCommand}s, {@link #onBeforeApply(Command)} is executed 
atomically with accepting the command: that is,
+ * before-apply/accept of one command cannot intermingle with 
before-apply/accept of other commands.
+ *
+ * <p>For {@link ReadCommand}s, no atomicity guarantees are provided.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface BeforeApplyHandler {
+    /**
+     * Invoked on a leader before submitting a command to a raft group.
+     * If a command must be changed before saving to raft log,
+     * this is a place to do it.
+     *
+     * @param command The command.
+     */
+    void onBeforeApply(Command command);
+}
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
index a024710203..212a34807c 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.raft.service;
 import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.function.Consumer;
-import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 
@@ -82,15 +81,4 @@ public interface RaftGroupListener {
      * Invoked once after a raft node has been shut down.
      */
     void onShutdown();
-
-    /**
-     * Invoked on a leader before submitting a command to a raft group.
-     * If a command must be changed before saving to raft log,
-     * this is a place to do it.
-     *
-     * @param command The command.
-     */
-    default void onBeforeApply(Command command) {
-        // No-op.
-    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
index ef1307f660..cd320381b1 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
@@ -19,6 +19,8 @@ package org.apache.ignite.raft.jraft.rpc.impl;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -26,6 +28,8 @@ import 
org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
+import 
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
+import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.raft.jraft.Closure;
 import org.apache.ignite.raft.jraft.Node;
@@ -41,7 +45,8 @@ import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
 import org.apache.ignite.raft.jraft.rpc.RpcContext;
 import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
-import org.apache.ignite.raft.jraft.util.BytesUtil;import 
org.apache.ignite.raft.jraft.util.Marshaller;
+import org.apache.ignite.raft.jraft.util.BytesUtil;
+import org.apache.ignite.raft.jraft.util.Marshaller;
 
 /**
  * Process action request.
@@ -53,6 +58,12 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
 
     private final RaftMessagesFactory factory;
 
+    /**
+    * Mapping from group IDs to monitors used to synchronized on (only used 
when
+    * RaftGroupListener instance implements {@link BeforeApplyHandler} and the 
command is a write command.
+    */
+    private final Map<String, Object> groupIdsToMonitors = new 
ConcurrentHashMap<>();
+
     public ActionRequestProcessor(Executor executor, RaftMessagesFactory 
factory) {
         this.executor = executor;
         this.factory = factory;
@@ -71,15 +82,32 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
 
         JraftServerImpl.DelegatingStateMachine fsm = 
(JraftServerImpl.DelegatingStateMachine) node.getOptions().getFsm();
 
-        // Apply a filter before committing to STM.
-        fsm.getListener().onBeforeApply(request.command());
-
         if (request.command() instanceof WriteCommand) {
-            applyWrite(node, request, rpcCtx);
+            if (fsm.getListener() instanceof BeforeApplyHandler) {
+                synchronized (groupIdSyncMonitor(request.groupId())) {
+                    callOnBeforeApply(request, fsm);
+                    applyWrite(node, request, rpcCtx);
+                }
+            } else {
+                applyWrite(node, request, rpcCtx);
+            }
         } else {
+            if (fsm.getListener() instanceof BeforeApplyHandler) {
+                callOnBeforeApply(request, fsm);
+            }
+
             applyRead(node, request, rpcCtx);
         }
     }
+    private static void callOnBeforeApply(ActionRequest request, 
DelegatingStateMachine fsm) {
+        ((BeforeApplyHandler) 
fsm.getListener()).onBeforeApply(request.command());
+    }
+
+    private Object groupIdSyncMonitor(String groupId) {
+        assert groupId != null;
+
+        return groupIdsToMonitors.computeIfAbsent(groupId, k -> groupId);
+    }
 
     /**
      * @param node    The node.

Reply via email to