This is an automated email from the ASF dual-hosted git repository.

petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new fff85da4b7a IGNITE-28770 Added propagation of OperationContext 
attributes to remote nodes via Discovery (#13243)
fff85da4b7a is described below

commit fff85da4b7a1b99d41d3e9ea1aa1ac7969fa3bbf
Author: Vladimir Steshin <[email protected]>
AuthorDate: Tue Jun 23 19:08:09 2026 +0300

    IGNITE-28770 Added propagation of OperationContext attributes to remote 
nodes via Discovery (#13243)
---
 .../internal/thread/context/OperationContext.java  |   2 +-
 .../ignite/internal/CoreMessagesProvider.java      |   4 +
 .../DistributedOperationContextMessage.java        |  42 +++
 .../DistributedOperationContextManager.java        | 148 +++++++++
 .../ignite/spi/discovery/tcp/ClientImpl.java       | 369 +++++++++++----------
 .../ignite/spi/discovery/tcp/ServerImpl.java       |  25 +-
 .../tcp/messages/InetSocketAddressMessage.java     |   1 -
 .../tcp/messages/TcpDiscoveryAbstractMessage.java  |   7 +
 .../context/OperationContextAttributesTest.java    | 106 +++++-
 9 files changed, 513 insertions(+), 191 deletions(-)

diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
index 6953d8b8538..4a8f556781c 100644
--- 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
@@ -322,7 +322,7 @@ public class OperationContext {
     }
 
     /** Allows to change multiple attribute values in a single update 
operation and skip updates that changes nothing. */
-    private static class ContextUpdater {
+    static class ContextUpdater {
         /** */
         private static final int INIT_UPDATES_CAPACITY = 3;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index 9da592635d2..cc0ab3e112b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -666,6 +666,10 @@ public class CoreMessagesProvider extends 
AbstractMarshallableMessageFactoryProv
         withNoSchema(PartitionHashRecord.class);
         withNoSchema(TransactionsHashRecord.class);
 
+        // [13400 - 13600]: Operation context messages.
+        msgIdx = 13400;
+        withNoSchema(DistributedOperationContextMessage.class);
+
         assert msgIdx <= MAX_MESSAGE_ID;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
new file mode 100644
index 00000000000..42d5c7eda85
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import 
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/**
+ * Transport for {@link OperationContext} distributed attributes.
+ *
+ * @see DistributedOperationContextManager
+ */
+public class DistributedOperationContextMessage implements Message {
+    /** Values of operation context attributes. */
+    @Order(0)
+    public Message[] vals;
+
+    /** Bitmap of effective attributes ids. */
+    @Order(1)
+    public byte idBitmap;
+
+    /** Empty constructor for serialization purposes. */
+    public DistributedOperationContextMessage() {
+        // No-op.
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
new file mode 100644
index 00000000000..77adfae5e64
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.thread.context;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.DistributedOperationContextMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides the ability to manage {@link OperationContext} attributes in a 
distributed manner.
+ *
+ * <p>This mechanism is primarily used to propagate {@link OperationContext} 
state across the cluster by
+ * capturing it before a message is sent, transferring it together with the 
message, and restoring it on
+ * the receiving node before message processing begins.</p>
+ *
+ * <p>The implementation relies on a mapping between a distributed identifier 
and an
+ * {@link OperationContextAttribute} instance that is consistent across all 
cluster nodes.</p>
+ *
+ * <p>To enable propagation of an {@link OperationContextAttribute} value 
across cluster nodes, the
+ * attribute must be created using the {@link 
#createDistributedAttribute(byte, Message)} method.
+ *
+ * <p> Note, that the maximum number of distributed attribute instances that 
can be created is currently limited to
+ * {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
+ *
+ * @see OperationContext
+ * @see DistributedOperationContextMessage
+ */
+public class DistributedOperationContextManager {
+    /** */
+    private static final DistributedOperationContextManager INSTANCE = new 
DistributedOperationContextManager();
+
+    /** Maximal number of supported distributed attributes. */
+    static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
+
+    /** Registered distributed attributes by their cluster-wide id. */
+    private final Map<Byte, OperationContextAttribute<Message>> attrs = new 
ConcurrentSkipListMap<>();
+
+    /** */
+    public static DistributedOperationContextManager instance() {
+        return INSTANCE;
+    }
+
+    /**
+     * Creates a new {@link OperationContext} attribute with the specified 
distributed ID and initial value.
+     *
+     * <p>The distributed ID is used to consistently identify the attribute 
across all nodes in the cluster.
+     * It must be unique, and its value must be in the range from {@code 0} 
(inclusive) to {@code Byte.SIZE} (exclusive).</p>
+     *
+     * <p>The value of the created attribute is automatically captured and 
propagated between cluster nodes
+     * during message transmission.</p>
+     *
+     * @see OperationContextAttribute#newInstance(Object)
+     */
+    public <T extends Message> OperationContextAttribute<T> 
createDistributedAttribute(byte id, @Nullable T initVal) {
+        assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed 
attributed id [id=" + id + ']';
+
+        return (OperationContextAttribute<T>)attrs.compute(id, (id0, attr0) -> 
{
+            if (attr0 != null)
+                throw new IgniteException("Duplicated distributed attribute id 
[id=" + id + ']');
+
+            return OperationContextAttribute.newInstance(initVal);
+        });
+    }
+
+    /**
+     * Collects the values of all distributed {@link 
OperationContextAttribute}s registered by this manager in a format
+     * suitable for transmission between cluster nodes.
+     *
+     * @see OperationContext#get(OperationContextAttribute)
+     */
+    public @Nullable DistributedOperationContextMessage 
collectDistributedAttributes() {
+        DistributedOperationContextMessage res = null;
+        List<Message> vals = null;
+
+        for (Map.Entry<Byte, OperationContextAttribute<Message>> e : 
attrs.entrySet()) {
+            OperationContextAttribute<? extends Message> attr = e.getValue();
+
+            Message curVal = OperationContext.get(attr);
+
+            if (curVal != attr.initialValue()) {
+                if (res == null) {
+                    res = new DistributedOperationContextMessage();
+
+                    vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2);
+                }
+
+                byte mask = (byte)(1 << e.getKey());
+
+                assert (res.idBitmap & mask) == 0;
+
+                vals.add(curVal);
+                res.idBitmap |= mask;
+            }
+        }
+
+        if (res != null)
+            res.vals = vals.toArray(new Message[vals.size()]);
+
+        return res;
+    }
+
+    /** Restores distributed {@link OperationContextAttribute} values received 
from a remote node. */
+    public Scope restoreDistributedAttributes(@Nullable 
DistributedOperationContextMessage msg) {
+        if (msg == null)
+            return Scope.NOOP_SCOPE;
+
+        assert msg.idBitmap != 0;
+        assert !F.isEmpty(msg.vals);
+        assert msg.vals.length <= MAX_DISTRIBUTED_ATTR_CNT;
+
+        OperationContext.ContextUpdater updater = 
OperationContext.ContextUpdater.create();
+
+        for (byte valIdx = 0, maskIdx = 0; valIdx < msg.vals.length; ++valIdx) 
{
+            Message curVal = msg.vals[valIdx];
+
+            while ((msg.idBitmap & (1 << maskIdx)) == 0)
+                ++maskIdx;
+
+            OperationContextAttribute<Message> attr = attrs.get(maskIdx++);
+
+            assert attr != null;
+
+            updater.set(attr, curVal);
+        }
+
+        return updater.apply();
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index a0e1a200487..ccf622be6ea 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -70,6 +70,8 @@ import org.apache.ignite.internal.processors.tracing.SpanTags;
 import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
 import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage;
 import 
org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
+import 
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
+import org.apache.ignite.internal.thread.context.Scope;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -1310,6 +1312,8 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void sendMessage(TcpDiscoveryAbstractMessage msg) {
+            msg.opCtxMsg = 
DistributedOperationContextManager.instance().collectDistributedAttributes();
+
             synchronized (mux) {
                 queue.add(msg);
 
@@ -1757,251 +1761,256 @@ class ClientImpl extends TcpDiscoveryImpl {
                         blockingSectionEnd();
                     }
 
-                    if (msg instanceof JoinTimeout) {
-                        int joinCnt0 = ((JoinTimeout)msg).joinCnt;
+                    TcpDiscoveryAbstractMessage dm = msg instanceof 
TcpDiscoveryAbstractMessage
+                        ? (TcpDiscoveryAbstractMessage)msg
+                        : null;
 
-                        if (joinCnt == joinCnt0) {
-                            if (state == STARTING) {
-                                joinError(new IgniteSpiException("Join process 
timed out, did not receive response for " +
-                                    "join request (consider increasing 
'joinTimeout' configuration property) " +
-                                    "[joinTimeout=" + spi.joinTimeout + ", 
sock=" + currSock + ']'));
+                    try (Scope ignored = 
DistributedOperationContextManager.instance()
+                        .restoreDistributedAttributes(dm == null ? null : 
dm.opCtxMsg)) {
+                        if (msg instanceof JoinTimeout) {
+                            int joinCnt0 = ((JoinTimeout)msg).joinCnt;
 
-                                break;
-                            }
-                            else if (state == DISCONNECTED) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to reconnect, local node 
segmented " +
-                                        "[joinTimeout=" + spi.joinTimeout + 
']');
+                            if (joinCnt == joinCnt0) {
+                                if (state == STARTING) {
+                                    joinError(new IgniteSpiException("Join 
process timed out, did not receive response for " +
+                                        "join request (consider increasing 
'joinTimeout' configuration property) " +
+                                        "[joinTimeout=" + spi.joinTimeout + ", 
sock=" + currSock + ']'));
+
+                                    break;
+                                }
+                                else if (state == DISCONNECTED) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to reconnect, local 
node segmented " +
+                                            "[joinTimeout=" + spi.joinTimeout 
+ ']');
 
-                                state = SEGMENTED;
+                                    state = SEGMENTED;
 
-                                notifyDiscovery(
-                                    EVT_NODE_SEGMENTED, topVer, locNode, 
allVisibleNodes(), null);
+                                    notifyDiscovery(
+                                        EVT_NODE_SEGMENTED, topVer, locNode, 
allVisibleNodes(), null);
+                                }
                             }
                         }
-                    }
-                    else if (msg == SPI_STOP) {
-                        boolean connected = state == CONNECTED;
+                        else if (msg == SPI_STOP) {
+                            boolean connected = state == CONNECTED;
 
-                        state = STOPPED;
+                            state = STOPPED;
 
-                        assert spi.getSpiContext().isStopping();
+                            assert spi.getSpiContext().isStopping();
 
-                        if (connected && currSock != null) {
-                            TcpDiscoveryNodeLeftMessage leftMsg = new 
TcpDiscoveryNodeLeftMessage(getLocalNodeId());
+                            if (connected && currSock != null) {
+                                TcpDiscoveryNodeLeftMessage leftMsg = new 
TcpDiscoveryNodeLeftMessage(getLocalNodeId());
 
-                            leftMsg.client(true);
+                                leftMsg.client(true);
 
-                            Span rootSpan = 
tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass()))
-                                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, 
SpanTags.ID), () -> locNode.id().toString())
-                                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, 
SpanTags.CONSISTENT_ID),
-                                    () -> locNode.consistentId().toString())
-                                .addLog(() -> "Created");
+                                Span rootSpan = 
tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass()))
+                                    .addTag(SpanTags.tag(SpanTags.EVENT_NODE, 
SpanTags.ID), () -> locNode.id().toString())
+                                    .addTag(SpanTags.tag(SpanTags.EVENT_NODE, 
SpanTags.CONSISTENT_ID),
+                                        () -> 
locNode.consistentId().toString())
+                                    .addLog(() -> "Created");
 
-                            
leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
+                                
leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
 
-                            sockWriter.sendMessage(leftMsg);
+                                sockWriter.sendMessage(leftMsg);
 
-                            rootSpan.addLog(() -> "Sent").end();
+                                rootSpan.addLog(() -> "Sent").end();
+                            }
+                            else
+                                leaveLatch.countDown();
                         }
-                        else
-                            leaveLatch.countDown();
-                    }
-                    else if (msg == SPI_RECONNECT) {
-                        if (state == CONNECTED) {
-                            if (reconnector != null) {
-                                reconnector.cancel();
-                                reconnector.join();
+                        else if (msg == SPI_RECONNECT) {
+                            if (state == CONNECTED) {
+                                if (reconnector != null) {
+                                    reconnector.cancel();
+                                    reconnector.join();
 
-                                reconnector = null;
-                            }
+                                    reconnector = null;
+                                }
 
-                            sockWriter.forceLeave();
-                            sockReader.forceStopRead();
+                                sockWriter.forceLeave();
+                                sockReader.forceStopRead();
 
-                            currSock = null;
+                                currSock = null;
 
-                            queue.clear();
+                                queue.clear();
 
-                            onDisconnected();
+                                onDisconnected();
 
-                            UUID newId = UUID.randomUUID();
+                                UUID newId = UUID.randomUUID();
 
-                            U.quietAndWarn(log, "Local node will try to 
reconnect to cluster with new id due " +
-                                "to network problems [newId=" + newId +
-                                ", prevId=" + locNode.id() +
-                                ", locNode=" + locNode + ']');
+                                U.quietAndWarn(log, "Local node will try to 
reconnect to cluster with new id due " +
+                                    "to network problems [newId=" + newId +
+                                    ", prevId=" + locNode.id() +
+                                    ", locNode=" + locNode + ']');
 
-                            locNode.onClientDisconnected(newId);
+                                locNode.onClientDisconnected(newId);
 
-                            throttleClientReconnect();
+                                throttleClientReconnect();
 
-                            tryJoin();
+                                tryJoin();
+                            }
                         }
-                    }
-                    else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
-                        
((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
-                        TcpDiscoveryNodeFailedMessage msg0 = 
(TcpDiscoveryNodeFailedMessage)msg;
+                        else if (msg instanceof TcpDiscoveryNodeFailedMessage 
&&
+                            
((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
+                            TcpDiscoveryNodeFailedMessage msg0 = 
(TcpDiscoveryNodeFailedMessage)msg;
 
-                        assert msg0.force() : msg0;
+                            assert msg0.force() : msg0;
 
-                        forceFailMsg = msg0;
-                    }
-                    else if (msg instanceof SocketClosedMessage) {
-                        if (((SocketClosedMessage)msg).sock == currSock) {
-                            Socket sock = currSock.sock;
+                            forceFailMsg = msg0;
+                        }
+                        else if (msg instanceof SocketClosedMessage) {
+                            if (((SocketClosedMessage)msg).sock == currSock) {
+                                Socket sock = currSock.sock;
 
-                            InetSocketAddress prevAddr = new 
InetSocketAddress(sock.getInetAddress(), sock.getPort());
+                                InetSocketAddress prevAddr = new 
InetSocketAddress(sock.getInetAddress(), sock.getPort());
 
-                            currSock = null;
+                                currSock = null;
 
-                            boolean join = joinLatch.getCount() > 0;
+                                boolean join = joinLatch.getCount() > 0;
 
-                            if (spi.getSpiContext().isStopping() || state == 
SEGMENTED) {
-                                leaveLatch.countDown();
+                                if (spi.getSpiContext().isStopping() || state 
== SEGMENTED) {
+                                    leaveLatch.countDown();
 
-                                if (join) {
-                                    joinError(new IgniteSpiException("Failed 
to connect to cluster: socket closed."));
+                                    if (join) {
+                                        joinError(new 
IgniteSpiException("Failed to connect to cluster: socket closed."));
 
-                                    break;
-                                }
-                            }
-                            else {
-                                if (forceFailMsg != null) {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("Connection closed, local 
node received force fail message, " +
-                                            "will not try to restore 
connection");
+                                        break;
                                     }
-
-                                    queue.addFirst(SPI_RECONNECT_FAILED);
                                 }
                                 else {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Connection closed, will try 
to restore connection.");
+                                    if (forceFailMsg != null) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("Connection closed, 
local node received force fail message, " +
+                                                "will not try to restore 
connection");
+                                        }
 
-                                    assert reconnector == null;
+                                        queue.addFirst(SPI_RECONNECT_FAILED);
+                                    }
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Connection closed, will 
try to restore connection.");
+
+                                        assert reconnector == null;
 
-                                    reconnector = new Reconnector(join, 
prevAddr);
-                                    reconnector.start();
+                                        reconnector = new Reconnector(join, 
prevAddr);
+                                        reconnector.start();
+                                    }
                                 }
                             }
                         }
-                    }
-                    else if (msg == SPI_RECONNECT_FAILED) {
-                        if (reconnector != null) {
-                            reconnector.cancel();
-                            reconnector.join();
-
-                            reconnector = null;
-                        }
-                        else
-                            assert forceFailMsg != null;
+                        else if (msg == SPI_RECONNECT_FAILED) {
+                            if (reconnector != null) {
+                                reconnector.cancel();
+                                reconnector.join();
 
-                        if (spi.isClientReconnectDisabled()) {
-                            if (state != SEGMENTED && state != STOPPED) {
-                                if (forceFailMsg != null) {
-                                    U.quietAndWarn(log, "Local node was 
dropped from cluster due to network problems " +
-                                        "[nodeInitiatedFail=" + 
forceFailMsg.creatorNodeId() +
-                                        ", msg=" + forceFailMsg.warning() + 
']');
-                                }
+                                reconnector = null;
+                            }
+                            else
+                                assert forceFailMsg != null;
+
+                            if (spi.isClientReconnectDisabled()) {
+                                if (state != SEGMENTED && state != STOPPED) {
+                                    if (forceFailMsg != null) {
+                                        U.quietAndWarn(log, "Local node was 
dropped from cluster due to network problems " +
+                                            "[nodeInitiatedFail=" + 
forceFailMsg.creatorNodeId() +
+                                            ", msg=" + forceFailMsg.warning() 
+ ']');
+                                    }
 
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Failed to restore closed 
connection, reconnect disabled, " +
-                                        "local node segmented 
[networkTimeout=" + spi.netTimeout + ']');
-                                }
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Failed to restore closed 
connection, reconnect disabled, " +
+                                            "local node segmented 
[networkTimeout=" + spi.netTimeout + ']');
+                                    }
 
-                                state = SEGMENTED;
+                                    state = SEGMENTED;
 
-                                notifyDiscovery(
-                                    EVT_NODE_SEGMENTED, topVer, locNode, 
allVisibleNodes(), null);
-                            }
-                        }
-                        else {
-                            if (state == STARTING || state == CONNECTED) {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Failed to restore closed 
connection, will try to reconnect " +
-                                        "[networkTimeout=" + spi.netTimeout +
-                                        ", joinTimeout=" + spi.joinTimeout +
-                                        ", failMsg=" + forceFailMsg + ']');
+                                    notifyDiscovery(
+                                        EVT_NODE_SEGMENTED, topVer, locNode, 
allVisibleNodes(), null);
                                 }
-
-                                onDisconnected();
                             }
+                            else {
+                                if (state == STARTING || state == CONNECTED) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Failed to restore closed 
connection, will try to reconnect " +
+                                            "[networkTimeout=" + 
spi.netTimeout +
+                                            ", joinTimeout=" + spi.joinTimeout 
+
+                                            ", failMsg=" + forceFailMsg + ']');
+                                    }
 
-                            UUID newId = UUID.randomUUID();
+                                    onDisconnected();
+                                }
 
-                            if (forceFailMsg != null) {
-                                long delay = 
IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY,
-                                    DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY);
+                                UUID newId = UUID.randomUUID();
 
-                                if (delay > 0) {
-                                    U.quietAndWarn(log, "Local node was 
dropped from cluster due to network problems, " +
-                                        "will try to reconnect with new id 
after " + delay + "ms (reconnect delay " +
-                                        "can be changed using 
IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " +
-                                        "property) [" +
-                                        "newId=" + newId +
-                                        ", prevId=" + locNode.id() +
-                                        ", locNode=" + locNode +
-                                        ", nodeInitiatedFail=" + 
forceFailMsg.creatorNodeId() +
-                                        ", msg=" + forceFailMsg.warning() + 
']');
+                                if (forceFailMsg != null) {
+                                    long delay = 
IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY,
+                                        
DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY);
+
+                                    if (delay > 0) {
+                                        U.quietAndWarn(log, "Local node was 
dropped from cluster due to network problems, " +
+                                            "will try to reconnect with new id 
after " + delay + "ms (reconnect delay " +
+                                            "can be changed using 
IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " +
+                                            "property) [" +
+                                            "newId=" + newId +
+                                            ", prevId=" + locNode.id() +
+                                            ", locNode=" + locNode +
+                                            ", nodeInitiatedFail=" + 
forceFailMsg.creatorNodeId() +
+                                            ", msg=" + forceFailMsg.warning() 
+ ']');
+
+                                        Thread.sleep(delay);
+                                    }
+                                    else {
+                                        U.quietAndWarn(log, "Local node was 
dropped from cluster due to network problems, " +
+                                            "will try to reconnect with new id 
[" +
+                                            "newId=" + newId +
+                                            ", prevId=" + locNode.id() +
+                                            ", locNode=" + locNode +
+                                            ", nodeInitiatedFail=" + 
forceFailMsg.creatorNodeId() +
+                                            ", msg=" + forceFailMsg.warning() 
+ ']');
+                                    }
 
-                                    Thread.sleep(delay);
+                                    forceFailMsg = null;
                                 }
-                                else {
-                                    U.quietAndWarn(log, "Local node was 
dropped from cluster due to network problems, " +
-                                        "will try to reconnect with new id [" +
-                                        "newId=" + newId +
-                                        ", prevId=" + locNode.id() +
-                                        ", locNode=" + locNode +
-                                        ", nodeInitiatedFail=" + 
forceFailMsg.creatorNodeId() +
-                                        ", msg=" + forceFailMsg.warning() + 
']');
+                                else if (log.isInfoEnabled()) {
+                                    log.info("Client node disconnected from 
cluster, will try to reconnect with new id " +
+                                        "[newId=" + newId + ", prevId=" + 
locNode.id() + ", locNode=" + locNode + ']');
                                 }
 
-                                forceFailMsg = null;
-                            }
-                            else if (log.isInfoEnabled()) {
-                                log.info("Client node disconnected from 
cluster, will try to reconnect with new id " +
-                                    "[newId=" + newId + ", prevId=" + 
locNode.id() + ", locNode=" + locNode + ']');
-                            }
+                                locNode.onClientDisconnected(newId);
 
-                            locNode.onClientDisconnected(newId);
-
-                            tryJoin();
+                                tryJoin();
+                            }
                         }
-                    }
-                    else {
-                        TcpDiscoveryAbstractMessage discoMsg = 
(TcpDiscoveryAbstractMessage)msg;
-
-                        if (joining()) {
-                            IgniteSpiException err = null;
+                        else {
+                            if (joining()) {
+                                IgniteSpiException err = null;
 
-                            if (discoMsg instanceof 
TcpDiscoveryDuplicateIdMessage)
-                                err = 
spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
-                            else if (discoMsg instanceof 
TcpDiscoveryAuthFailedMessage)
-                                err = 
spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
-                            //TODO: 
https://issues.apache.org/jira/browse/IGNITE-9829
-                            else if (discoMsg instanceof 
TcpDiscoveryCheckFailedMessage)
-                                err = 
spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
+                                if (dm instanceof 
TcpDiscoveryDuplicateIdMessage)
+                                    err = 
spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
+                                else if (dm instanceof 
TcpDiscoveryAuthFailedMessage)
+                                    err = 
spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
+                                    //TODO: 
https://issues.apache.org/jira/browse/IGNITE-9829
+                                else if (dm instanceof 
TcpDiscoveryCheckFailedMessage)
+                                    err = 
spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
 
-                            if (err != null) {
-                                if (state == DISCONNECTED) {
-                                    U.error(log, "Failed to reconnect, segment 
local node.", err);
+                                if (err != null) {
+                                    if (state == DISCONNECTED) {
+                                        U.error(log, "Failed to reconnect, 
segment local node.", err);
 
-                                    state = SEGMENTED;
+                                        state = SEGMENTED;
 
-                                    notifyDiscovery(
-                                        EVT_NODE_SEGMENTED, topVer, locNode, 
allVisibleNodes(), null);
-                                }
-                                else
-                                    joinError(err);
+                                        notifyDiscovery(
+                                            EVT_NODE_SEGMENTED, topVer, 
locNode, allVisibleNodes(), null);
+                                    }
+                                    else
+                                        joinError(err);
 
-                                cancel();
+                                    cancel();
 
-                                break;
+                                    break;
+                                }
                             }
-                        }
 
-                        
processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg);
+                            processDiscoveryMessage(dm);
+                        }
                     }
                 }
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 82c012c2a1a..a8a4f2f0a47 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -95,6 +95,8 @@ import org.apache.ignite.internal.processors.tracing.SpanTags;
 import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
 import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage;
 import 
org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
+import 
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
+import org.apache.ignite.internal.thread.context.Scope;
 import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
 import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -3046,8 +3048,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 return;
             }
 
-            if (msg instanceof TraceableMessage) {
-                TraceableMessage tMsg = (TraceableMessage)msg;
+            if (!fromSocket)
+                msg.opCtxMsg = 
DistributedOperationContextManager.instance().collectDistributedAttributes();
+
+            if (msg instanceof TraceableMessage tMsg) {
 
                 // If we read this message from socket.
                 if (fromSocket)
@@ -3173,11 +3177,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 task.run();
         }
 
-        /** {@inheritDoc} */
-        @Override protected void processMessage(TcpDiscoveryAbstractMessage 
msg) {
-            if (msg == WAKEUP)
-                return;
-
+        /** */
+        private void processMessage0(TcpDiscoveryAbstractMessage msg) {
             notifiedDiscovery.set(false);
 
             if (msg instanceof TraceableMessage) {
@@ -3315,6 +3316,16 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
         }
 
+        /** {@inheritDoc} */
+        @Override protected void processMessage(TcpDiscoveryAbstractMessage 
msg) {
+            if (msg == WAKEUP)
+                return;
+
+            try (Scope ignored = 
DistributedOperationContextManager.instance().restoreDistributedAttributes(msg.opCtxMsg))
 {
+                processMessage0(msg);
+            }
+        }
+
         /**
          * Processes authentication failed message.
          *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java
index f23e36f200d..d76279fb280 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java
@@ -52,7 +52,6 @@ public class InetSocketAddressMessage extends 
InetAddressMessage {
         return port;
     }
 
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(InetSocketAddressMessage.class, this);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 7a97763c36b..60c38866034 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.internal.DistributedOperationContextMessage;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -76,6 +77,11 @@ public abstract class TcpDiscoveryAbstractMessage implements 
Message {
     @Order(4)
     Set<UUID> failedNodes;
 
+    /** Operation context attributes message. */
+    @GridToStringInclude
+    @Order(5)
+    public @Nullable DistributedOperationContextMessage opCtxMsg;
+
     /**
      * Default no-arg constructor for {@link Externalizable} interface.
      */
@@ -100,6 +106,7 @@ public abstract class TcpDiscoveryAbstractMessage 
implements Message {
         verifierNodeId = msg.verifierNodeId;
         topVer = msg.topVer;
         flags = msg.flags;
+        opCtxMsg = msg.opCtxMsg;
     }
 
     /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
index 9de906b2729..bdf84b743ad 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.thread.context;
 
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -36,8 +37,13 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
@@ -48,6 +54,7 @@ import 
org.apache.ignite.internal.thread.pool.IgniteStripedExecutor;
 import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor;
 import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.queue.IgniteAsyncObjectHandler;
 import org.apache.ignite.internal.util.worker.queue.IgniteDelayedObjectHandler;
@@ -56,6 +63,7 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.thread.IgniteThread;
 import org.junit.Test;
@@ -64,6 +72,7 @@ import org.springframework.lang.NonNull;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /** */
 public class OperationContextAttributesTest extends GridCommonAbstractTest {
@@ -98,6 +107,8 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
+        stopAllGrids();
+
         if (poolToShutdownAfterTest != null)
             poolToShutdownAfterTest.shutdownNow();
 
@@ -808,6 +819,98 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
         }
     }
 
+    /** */
+    @Test
+    public void testSendAttributesByDiscovery() throws Exception {
+        byte attrId1 = 0;
+        byte attrId2 = 
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+
+        InetSocketAddressMessage dfltDistAttr1Val = new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
+        GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+
+        // Local attribute 1.
+        OperationContextAttribute.newInstance(1000);
+
+        // Distributed attribute 1.
+        OperationContextAttribute<InetSocketAddressMessage> dAttr1 = 
DistributedOperationContextManager.instance()
+            .createDistributedAttribute(attrId1, dfltDistAttr1Val);
+
+        // Local attribute 2.
+        OperationContextAttribute.newInstance("locaAttr2");
+
+        // Distributed attribute 2.
+        OperationContextAttribute<GridCacheVersion> dAttr2 = 
DistributedOperationContextManager.instance()
+            .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
+
+        startGrids(2);
+        startClientGrid(2);
+
+        CountDownLatch coordLatch = new CountDownLatch(3);
+        CountDownLatch srvrLatch = new CountDownLatch(3);
+        CountDownLatch clientLatch = new CountDownLatch(3);
+
+        InetSocketAddressMessage valToSend1 = new 
InetSocketAddressMessage(dfltDistAttr1Val.address(), 443);
+        GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2);
+
+        for (int i = 0; i < G.allGrids().size(); ++i) {
+            int i0 = i;
+
+            grid(i).context().discovery().setCustomEventListener(
+                DynamicCacheChangeBatch.class, new CustomEventListener<>() {
+                    @Override public void 
onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
+                        DynamicCacheChangeBatch msg) {
+
+                        InetSocketAddressMessage receivedVal1 = 
OperationContext.get(dAttr1);
+                        GridCacheVersion receivedVal2 = 
OperationContext.get(dAttr2);
+
+                        assertNotNull(receivedVal1);
+                        assertNotNull(receivedVal2);
+
+                        assertFalse(dfltDistAttr1Val.port() == 
receivedVal1.port());
+                        assertEquals(receivedVal1.port(), valToSend1.port());
+                        assertEquals(receivedVal1.address(), 
valToSend1.address());
+
+                        assertFalse(dfltDistrAttr2Val.equals(receivedVal2));
+                        assertTrue(valToSend2.equals(receivedVal2));
+
+                        if (grid(i0).localNode().isClient())
+                            clientLatch.countDown();
+                        else if (grid(i0).localNode().order() == 1)
+                            coordLatch.countDown();
+                        else
+                            srvrLatch.countDown();
+                    }
+                });
+        }
+
+        // Send from the coordinator.
+        try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, 
valToSend2)) {
+            grid(0).createCache(defaultCacheConfiguration());
+        }
+
+        assertTrue(waitForCondition(() -> coordLatch.getCount() == 2, 
getTestTimeout()));
+        assertTrue(waitForCondition(() -> srvrLatch.getCount() == 2, 
getTestTimeout()));
+        assertTrue(waitForCondition(() -> clientLatch.getCount() == 2, 
getTestTimeout()));
+
+        // Send from a server.
+        try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, 
valToSend2)) {
+            grid(1).destroyCache(DEFAULT_CACHE_NAME);
+        }
+
+        assertTrue(waitForCondition(() -> coordLatch.getCount() == 1, 
getTestTimeout()));
+        assertTrue(waitForCondition(() -> srvrLatch.getCount() == 1, 
getTestTimeout()));
+        assertTrue(waitForCondition(() -> clientLatch.getCount() == 1, 
getTestTimeout()));
+
+        // Send from a client.
+        try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, 
valToSend2)) {
+            grid(2).createCache(defaultCacheConfiguration());
+        }
+
+        assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
+        assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
+        assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
+    }
+
     /** */
     private void doContextAwareExecutorServiceTest(ExecutorService pool) 
throws Exception {
         CountDownLatch poolUnblockedLatch = blockPool(pool);
@@ -923,9 +1026,8 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
 
         /** */
         static void assertAllCreatedChecksPassed() throws Exception {
-            for (AttributeValueChecker check : CHECKS) {
+            for (AttributeValueChecker check : CHECKS)
                 check.get(5_000, MILLISECONDS);
-            }
         }
 
         /** */

Reply via email to