ascherbakoff commented on code in PR #3811:
URL: https://github.com/apache/ignite-3/pull/3811#discussion_r1615710800


##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java:
##########
@@ -115,6 +118,32 @@ public NodeId nodeId() {
             return nodeId;
         }
 
+        @Override
+        public void nodeId(NodeId nodeId) {

Review Comment:
   Maybe use abstract class to reduce copy&paste ?



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java:
##########
@@ -291,62 +299,75 @@ private class StripeEntryHandler implements 
EventHandler<T> {
             this.stripeId = stripeId;
         }
 
-        /**
-         * Checks the replication group is subscribed to this stripe or not.
-         * @param nodeId Replication group node id.
-         * @return True if the group is subscribed, false otherwise.
-         */
-        public boolean isSubscribed(NodeId nodeId) {
-            return subscribers.containsKey(nodeId);
-        }
+        /** {@inheritDoc} */
+        @Override
+        public void onEvent(T event, long sequence, boolean endOfBatch) throws 
Exception {
+            if (event.type() == SUBSCRIBE) {
+                if (event.handler() == null) {
+                    subscribers.remove(event.nodeId());
+                } else {
+                    subscribers.put(event.nodeId(), (EventHandler<T>) 
event.handler());
+                }
+            } else {
+                internalBatching(event, sequence);
+            }
 
-        /**
-         * Subscribes a group to appropriate events for it.
-         *
-         * @param nodeId Node id.
-         * @param handler Event handler for the group specified.
-         */
-        void subscribe(NodeId nodeId, EventHandler<T> handler) {
-            subscribers.put(nodeId, handler);
+            if (endOfBatch) {
+                for (Map.Entry<NodeId, T> grpEvent : eventCache.entrySet()) {
+                    EventHandler<T> grpHandler = 
subscribers.get(grpEvent.getValue().nodeId());
+
+                    if (grpHandler != null) {
+                        if (metrics != null && metrics.enabled()) {
+                            metrics.hitToStripe(stripeId);
+
+                            
metrics.addBatchSize(currentBatchSizes.getOrDefault(grpEvent.getKey(), 0) + 1);
+                        }
+
+                        grpHandler.onEvent(grpEvent.getValue(), sequence, 
true);
+                    }
+                }
+
+                currentBatchSizes.clear();
+                eventCache.clear();
+            }
         }
 
         /**
-         * Unsubscribes a group for any event.
+         * Processes the event with intermediate cache to batch internally for 
each subscriber for the stripe.
          *
-         * @param nodeId Node id.
+         * @param event Disruptor event to process.
+         * @param sequence Number in the sequence of the element.
+         * @throws Exception Throw when some handler fails.
          */
-        void unsubscribe(NodeId nodeId) {
-            subscribers.remove(nodeId);
-        }
+        private void internalBatching(T event, long sequence) throws Exception 
{
+            NodeId pushNodeId = supportsBatches ? NOT_NODE_ID : event.nodeId();

Review Comment:
   supportsBatches should be renamed to something like skipInternalBatching
   I see no sense in caching these events, better directtly propagate them to a 
hander and get rid of NOT_NODE_ID
   Also see
   // TODO: IGNITE-15568 endOfBatch should be set to true to prevent caching 
tasks until IGNITE-15568 has fixed.



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java:
##########
@@ -228,58 +246,48 @@ public void unsubscribe(NodeId nodeId) {
 
         assert stripeId != -1 : "The replication group has not subscribed yet 
[nodeId=" + nodeId + "].";
 
-        eventHandlers.get(stripeId).unsubscribe(nodeId);
+        stripeMapper.remove(nodeId);
+
+        queues[stripeId].publishEvent((event, sequence) -> {
+            event.type(SUBSCRIBE);
+            event.nodeId(nodeId);
+            event.handler(null);
+        });
+
         exceptionHandlers.get(stripeId).unsubscribe(nodeId);
     }
 
     /**
-     * If the replication group is already subscribed, this method determines 
a stripe by a node id and returns a stripe number.
-     * If the replication group did not subscribed yet, this method returns 
{@code -1};
+     * If the replication group is already subscribed, this method determines 
a stripe by a node id and returns a stripe number. If the
+     * replication group did not subscribed yet, this method returns {@code 
-1};
      *
      * @param nodeId Node id.
      * @return Stripe of the Striped disruptor.
      */
     public int getStripe(NodeId nodeId) {
-        for (StripeEntryHandler handler : eventHandlers) {
-            if (handler.isSubscribed(nodeId)) {
-                return handler.stripeId;
-            }
-        }
-
-        return -1;
+        return stripeMapper.getOrDefault(nodeId, -1);
     }
 
     /**
      * Generates the next stripe number in a round-robin manner.
+     *
      * @return The stripe number.
      */
     private int nextStripeToSubscribe() {
         return Math.abs(incrementalCounter.getAndIncrement() % stripes);
     }
 
     /**
-     * Determines a Disruptor queue by a group id.
-     *
-     * @param nodeId Node id.
-     * @return Disruptor queue appropriate to the group.
-     */
-    public RingBuffer<T> queue(NodeId nodeId) {
-        int stripeId = getStripe(nodeId);
-
-        assert stripeId != -1 : "The replication group has not subscribed yet 
[nodeId=" + nodeId + "].";
-
-        return queues[stripeId];
-    }
-
-    /**
-     * Event handler for stripe of the Striped disruptor.
-     * It routs an event to the event handler for a group.
+     * Event handler for stripe of the Striped disruptor. It routs an event to 
the event handler for a group.

Review Comment:
   -> It routes



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java:
##########
@@ -40,15 +44,24 @@
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Stripe Disruptor is a set of queues which process several independent 
groups in one queue (in the stripe).
- * It makes fewer threads that the groups and gives the same sequential 
guaranties and a close performance.
+ * Stripe Disruptor is a set of queues which process several independent 
groups in one queue (in the stripe). It makes fewer threads that
+ * the groups and gives the same sequential guaranties and a close performance.
  *
  * @param <T> Event type. This event should implement {@link NodeIdAware} 
interface.
  */
 public class StripedDisruptor<T extends NodeIdAware> {
+    /**
+     * It is an id that does not represent any node. It is used to collect 
events in the disruptor implementation,
+     * where the particular node does not mean.
+     */
+    private final NodeId NOT_NODE_ID = new NodeId(null, null);

Review Comment:
   Better rename to FAKE_NODE_ID



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java:
##########
@@ -100,6 +101,8 @@ public String metricName() {
     public static class ApplyTask implements NodeIdAware {
         /** Raft node id. */
         NodeId nodeId;
+        EventHandler<NodeIdAware> handler;

Review Comment:
   This can be of type REGULAR by default if moved to base class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to