rpuch commented on code in PR #3300:
URL: https://github.com/apache/ignite-3/pull/3300#discussion_r1507052440


##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -337,82 +337,167 @@ private List<ClassDescriptorMessage> 
prepareMarshal(NetworkMessage msg) throws E
     /**
      * Sends a message to the current node.
      *
-     * @param msg Message.
+     * @param message Message.
      * @param correlationId Correlation id.
      */
-    private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
-        for (NetworkMessageHandler networkMessageHandler : 
getMessageHandlers(msg.groupType())) {
-            networkMessageHandler.onReceived(msg, 
topologyService.localMember().name(), correlationId);
+    private void sendToSelf(NetworkMessage message, @Nullable Long 
correlationId) {
+        for (HandlerContext context : getHandlerContexts(message.groupType())) 
{
+            // Invoking on the same thread, ignoring the executor chooser 
registered with the handler.
+            context.handler().onReceived(message, 
topologyService.localMember().name(), correlationId);
         }
     }
 
     /**
-     * Handles an incoming message.
+     * Handles a message coming from the network (not from the same node).
      *
-     * @param obj Incoming message wrapper.
+     * @param inNetworkObject Incoming message wrapper.
      */
-    private void onMessage(InNetworkObject obj) {
-        assert isInNetworkThread();
+    private void handleMessageFromNetwork(InNetworkObject inNetworkObject) {
+        assert isInNetworkThread() : Thread.currentThread().getName();
+
+        if (senderIdIsStale(inNetworkObject)) {
+            logMessageSkipDueToSenderLeft(inNetworkObject);
+            return;
+        }
+
+        if (inNetworkObject.message() instanceof InvokeResponse) {
+            Executor executor = chooseExecutorInInboundPool(inNetworkObject);
+            executor.execute(() -> handleInvokeResponse(inNetworkObject));
+            return;
+        }
 
-        inboundExecutors.execute(obj.connectionIndex(), () -> {
+        NetworkMessage payload;
+        Long correlationId = null;
+        if (inNetworkObject.message() instanceof InvokeRequest) {
+            InvokeRequest invokeRequest = (InvokeRequest) 
inNetworkObject.message();
+            payload = invokeRequest.message();
+            correlationId = invokeRequest.correlationId();
+        } else {
+            payload = inNetworkObject.message();
+        }
+
+        Iterator<HandlerContext> handlerContexts = 
getHandlerContexts(payload.groupType()).iterator();
+        if (!handlerContexts.hasNext()) {
+            // No need to handle this.
+            return;
+        }
+
+        HandlerContext firstHandlerContext = handlerContexts.next();
+        Executor firstHandlerExecutor = chooseExecutorFor(payload, 
inNetworkObject, firstHandlerContext.executorChooser());
+
+        Long finalCorrelationId = correlationId;
+        firstHandlerExecutor.execute(() -> {
             long startedNanos = System.nanoTime();
 
             try {
-                handleIncomingMessage(obj);
+                handleStartingWithFirstHandler(payload, finalCorrelationId, 
inNetworkObject, firstHandlerContext, handlerContexts);
             } catch (Throwable e) {
-                logAndRethrowIfError(obj, e);
+                logAndRethrowIfError(inNetworkObject, e);
             } finally {
                 long tookMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNanos);
                 if (tookMillis > 100) {
-                    LOG.warn("Processing of {} from {} took {} ms", 
obj.message(), obj.consistentId(), tookMillis);
+                    LOG.warn("Processing of {} from {} took {} ms", 
inNetworkObject.message(), inNetworkObject.consistentId(), tookMillis);
                 }
             }
         });
     }
 
-    private void handleIncomingMessage(InNetworkObject obj) {
-        if (senderIdIsStale(obj)) {
-            LOG.info("Sender ID {} ({}) is stale, so skipping message 
handling: {}", obj.launchId(), obj.consistentId(), obj.message());
-            return;
-        }
+    private static void logMessageSkipDueToSenderLeft(InNetworkObject 
inNetworkObject) {
+        LOG.info("Sender ID {} ({}) is stale, so skipping message handling: 
{}",
+                inNetworkObject.launchId(), inNetworkObject.consistentId(), 
inNetworkObject.message()
+        );
+    }
 
-        NetworkMessage msg = obj.message();
-        DescriptorRegistry registry = obj.registry();
+    private boolean senderIdIsStale(InNetworkObject obj) {
+        return staleIdDetector.isIdStale(obj.launchId());
+    }
+
+    private void handleInvokeResponse(InNetworkObject inNetworkObject) {
+        unmarshalMessage(inNetworkObject);
+
+        InvokeResponse response = (InvokeResponse) inNetworkObject.message();
+        onInvokeResponse(response.message(), response.correlationId());
+    }
+
+    private void unmarshalMessage(InNetworkObject obj) {
         try {
-            msg.unmarshal(marshaller, registry);
+            obj.message().unmarshal(marshaller, obj.registry());
         } catch (Exception e) {
             throw new IgniteException("Failed to unmarshal message: " + 
e.getMessage(), e);
         }
-        if (msg instanceof InvokeResponse) {
-            InvokeResponse response = (InvokeResponse) msg;
-            onInvokeResponse(response.message(), response.correlationId());
-            return;
+    }
+
+    private Executor chooseExecutorFor(NetworkMessage payload, InNetworkObject 
obj, ExecutorChooser<NetworkMessage> chooser) {
+        if (wantsInboundPool(chooser)) {
+            return chooseExecutorInInboundPool(obj);
+        } else {
+            return chooser.choose(payload);
         }
+    }
 
-        Long correlationId = null;
-        NetworkMessage message = msg;
+    private Executor chooseExecutorInInboundPool(InNetworkObject obj) {
+        return inboundExecutors.stripeFor(obj.connectionIndex());
+    }
 
-        if (msg instanceof InvokeRequest) {
-            // Unwrap invocation request
-            InvokeRequest messageWithCorrelation = (InvokeRequest) msg;
-            correlationId = messageWithCorrelation.correlationId();
-            message = messageWithCorrelation.message();
+    /**
+     * Finishes unmarshalling the message and handles it on current thread on 
first handler. Also handles it with other
+     * handlers (second and so on) on executors chosen by their choosers.
+     */
+    private void handleStartingWithFirstHandler(
+            NetworkMessage payload,
+            @Nullable Long correlationId,
+            InNetworkObject obj,
+            HandlerContext firstHandlerContext,
+            Iterator<HandlerContext> remainingContexts
+    ) {
+        if (senderIdIsStale(obj)) {

Review Comment:
   These 2 checks are made in different threads. First one is to skip handling 
as early as possible. But after that a task is put to an executor, and who 
knows how much time it will sit there awaiting for execution. During this time 
it might happen that the sender has already become stale, hence second check.
   
   Do you think second check is redundant?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to