This is an automated email from the ASF dual-hosted git repository.
petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fff85da4b7a IGNITE-28770 Added propagation of OperationContext
attributes to remote nodes via Discovery (#13243)
fff85da4b7a is described below
commit fff85da4b7a1b99d41d3e9ea1aa1ac7969fa3bbf
Author: Vladimir Steshin <[email protected]>
AuthorDate: Tue Jun 23 19:08:09 2026 +0300
IGNITE-28770 Added propagation of OperationContext attributes to remote
nodes via Discovery (#13243)
---
.../internal/thread/context/OperationContext.java | 2 +-
.../ignite/internal/CoreMessagesProvider.java | 4 +
.../DistributedOperationContextMessage.java | 42 +++
.../DistributedOperationContextManager.java | 148 +++++++++
.../ignite/spi/discovery/tcp/ClientImpl.java | 369 +++++++++++----------
.../ignite/spi/discovery/tcp/ServerImpl.java | 25 +-
.../tcp/messages/InetSocketAddressMessage.java | 1 -
.../tcp/messages/TcpDiscoveryAbstractMessage.java | 7 +
.../context/OperationContextAttributesTest.java | 106 +++++-
9 files changed, 513 insertions(+), 191 deletions(-)
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
index 6953d8b8538..4a8f556781c 100644
---
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
@@ -322,7 +322,7 @@ public class OperationContext {
}
/** Allows to change multiple attribute values in a single update
operation and skip updates that changes nothing. */
- private static class ContextUpdater {
+ static class ContextUpdater {
/** */
private static final int INIT_UPDATES_CAPACITY = 3;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index 9da592635d2..cc0ab3e112b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -666,6 +666,10 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(PartitionHashRecord.class);
withNoSchema(TransactionsHashRecord.class);
+ // [13400 - 13600]: Operation context messages.
+ msgIdx = 13400;
+ withNoSchema(DistributedOperationContextMessage.class);
+
assert msgIdx <= MAX_MESSAGE_ID;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
new file mode 100644
index 00000000000..42d5c7eda85
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/**
+ * Transport for {@link OperationContext} distributed attributes.
+ *
+ * @see DistributedOperationContextManager
+ */
+public class DistributedOperationContextMessage implements Message {
+ /** Values of operation context attributes. */
+ @Order(0)
+ public Message[] vals;
+
+ /** Bitmap of effective attributes ids. */
+ @Order(1)
+ public byte idBitmap;
+
+ /** Empty constructor for serialization purposes. */
+ public DistributedOperationContextMessage() {
+ // No-op.
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
new file mode 100644
index 00000000000..77adfae5e64
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.thread.context;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.DistributedOperationContextMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides the ability to manage {@link OperationContext} attributes in a
distributed manner.
+ *
+ * <p>This mechanism is primarily used to propagate {@link OperationContext}
state across the cluster by
+ * capturing it before a message is sent, transferring it together with the
message, and restoring it on
+ * the receiving node before message processing begins.</p>
+ *
+ * <p>The implementation relies on a mapping between a distributed identifier
and an
+ * {@link OperationContextAttribute} instance that is consistent across all
cluster nodes.</p>
+ *
+ * <p>To enable propagation of an {@link OperationContextAttribute} value
across cluster nodes, the
+ * attribute must be created using the {@link
#createDistributedAttribute(byte, Message)} method.
+ *
+ * <p> Note, that the maximum number of distributed attribute instances that
can be created is currently limited to
+ * {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
+ *
+ * @see OperationContext
+ * @see DistributedOperationContextMessage
+ */
+public class DistributedOperationContextManager {
+ /** */
+ private static final DistributedOperationContextManager INSTANCE = new
DistributedOperationContextManager();
+
+ /** Maximal number of supported distributed attributes. */
+ static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
+
+ /** Registered distributed attributes by their cluster-wide id. */
+ private final Map<Byte, OperationContextAttribute<Message>> attrs = new
ConcurrentSkipListMap<>();
+
+ /** */
+ public static DistributedOperationContextManager instance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Creates a new {@link OperationContext} attribute with the specified
distributed ID and initial value.
+ *
+ * <p>The distributed ID is used to consistently identify the attribute
across all nodes in the cluster.
+ * It must be unique, and its value must be in the range from {@code 0}
(inclusive) to {@code Byte.SIZE} (exclusive).</p>
+ *
+ * <p>The value of the created attribute is automatically captured and
propagated between cluster nodes
+ * during message transmission.</p>
+ *
+ * @see OperationContextAttribute#newInstance(Object)
+ */
+ public <T extends Message> OperationContextAttribute<T>
createDistributedAttribute(byte id, @Nullable T initVal) {
+ assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed
attributed id [id=" + id + ']';
+
+ return (OperationContextAttribute<T>)attrs.compute(id, (id0, attr0) ->
{
+ if (attr0 != null)
+ throw new IgniteException("Duplicated distributed attribute id
[id=" + id + ']');
+
+ return OperationContextAttribute.newInstance(initVal);
+ });
+ }
+
+ /**
+ * Collects the values of all distributed {@link
OperationContextAttribute}s registered by this manager in a format
+ * suitable for transmission between cluster nodes.
+ *
+ * @see OperationContext#get(OperationContextAttribute)
+ */
+ public @Nullable DistributedOperationContextMessage
collectDistributedAttributes() {
+ DistributedOperationContextMessage res = null;
+ List<Message> vals = null;
+
+ for (Map.Entry<Byte, OperationContextAttribute<Message>> e :
attrs.entrySet()) {
+ OperationContextAttribute<? extends Message> attr = e.getValue();
+
+ Message curVal = OperationContext.get(attr);
+
+ if (curVal != attr.initialValue()) {
+ if (res == null) {
+ res = new DistributedOperationContextMessage();
+
+ vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2);
+ }
+
+ byte mask = (byte)(1 << e.getKey());
+
+ assert (res.idBitmap & mask) == 0;
+
+ vals.add(curVal);
+ res.idBitmap |= mask;
+ }
+ }
+
+ if (res != null)
+ res.vals = vals.toArray(new Message[vals.size()]);
+
+ return res;
+ }
+
+ /** Restores distributed {@link OperationContextAttribute} values received
from a remote node. */
+ public Scope restoreDistributedAttributes(@Nullable
DistributedOperationContextMessage msg) {
+ if (msg == null)
+ return Scope.NOOP_SCOPE;
+
+ assert msg.idBitmap != 0;
+ assert !F.isEmpty(msg.vals);
+ assert msg.vals.length <= MAX_DISTRIBUTED_ATTR_CNT;
+
+ OperationContext.ContextUpdater updater =
OperationContext.ContextUpdater.create();
+
+ for (byte valIdx = 0, maskIdx = 0; valIdx < msg.vals.length; ++valIdx)
{
+ Message curVal = msg.vals[valIdx];
+
+ while ((msg.idBitmap & (1 << maskIdx)) == 0)
+ ++maskIdx;
+
+ OperationContextAttribute<Message> attr = attrs.get(maskIdx++);
+
+ assert attr != null;
+
+ updater.set(attr, curVal);
+ }
+
+ return updater.apply();
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index a0e1a200487..ccf622be6ea 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -70,6 +70,8 @@ import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage;
import
org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
+import
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -1310,6 +1312,8 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void sendMessage(TcpDiscoveryAbstractMessage msg) {
+ msg.opCtxMsg =
DistributedOperationContextManager.instance().collectDistributedAttributes();
+
synchronized (mux) {
queue.add(msg);
@@ -1757,251 +1761,256 @@ class ClientImpl extends TcpDiscoveryImpl {
blockingSectionEnd();
}
- if (msg instanceof JoinTimeout) {
- int joinCnt0 = ((JoinTimeout)msg).joinCnt;
+ TcpDiscoveryAbstractMessage dm = msg instanceof
TcpDiscoveryAbstractMessage
+ ? (TcpDiscoveryAbstractMessage)msg
+ : null;
- if (joinCnt == joinCnt0) {
- if (state == STARTING) {
- joinError(new IgniteSpiException("Join process
timed out, did not receive response for " +
- "join request (consider increasing
'joinTimeout' configuration property) " +
- "[joinTimeout=" + spi.joinTimeout + ",
sock=" + currSock + ']'));
+ try (Scope ignored =
DistributedOperationContextManager.instance()
+ .restoreDistributedAttributes(dm == null ? null :
dm.opCtxMsg)) {
+ if (msg instanceof JoinTimeout) {
+ int joinCnt0 = ((JoinTimeout)msg).joinCnt;
- break;
- }
- else if (state == DISCONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Failed to reconnect, local node
segmented " +
- "[joinTimeout=" + spi.joinTimeout +
']');
+ if (joinCnt == joinCnt0) {
+ if (state == STARTING) {
+ joinError(new IgniteSpiException("Join
process timed out, did not receive response for " +
+ "join request (consider increasing
'joinTimeout' configuration property) " +
+ "[joinTimeout=" + spi.joinTimeout + ",
sock=" + currSock + ']'));
+
+ break;
+ }
+ else if (state == DISCONNECTED) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to reconnect, local
node segmented " +
+ "[joinTimeout=" + spi.joinTimeout
+ ']');
- state = SEGMENTED;
+ state = SEGMENTED;
- notifyDiscovery(
- EVT_NODE_SEGMENTED, topVer, locNode,
allVisibleNodes(), null);
+ notifyDiscovery(
+ EVT_NODE_SEGMENTED, topVer, locNode,
allVisibleNodes(), null);
+ }
}
}
- }
- else if (msg == SPI_STOP) {
- boolean connected = state == CONNECTED;
+ else if (msg == SPI_STOP) {
+ boolean connected = state == CONNECTED;
- state = STOPPED;
+ state = STOPPED;
- assert spi.getSpiContext().isStopping();
+ assert spi.getSpiContext().isStopping();
- if (connected && currSock != null) {
- TcpDiscoveryNodeLeftMessage leftMsg = new
TcpDiscoveryNodeLeftMessage(getLocalNodeId());
+ if (connected && currSock != null) {
+ TcpDiscoveryNodeLeftMessage leftMsg = new
TcpDiscoveryNodeLeftMessage(getLocalNodeId());
- leftMsg.client(true);
+ leftMsg.client(true);
- Span rootSpan =
tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass()))
- .addTag(SpanTags.tag(SpanTags.EVENT_NODE,
SpanTags.ID), () -> locNode.id().toString())
- .addTag(SpanTags.tag(SpanTags.EVENT_NODE,
SpanTags.CONSISTENT_ID),
- () -> locNode.consistentId().toString())
- .addLog(() -> "Created");
+ Span rootSpan =
tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass()))
+ .addTag(SpanTags.tag(SpanTags.EVENT_NODE,
SpanTags.ID), () -> locNode.id().toString())
+ .addTag(SpanTags.tag(SpanTags.EVENT_NODE,
SpanTags.CONSISTENT_ID),
+ () ->
locNode.consistentId().toString())
+ .addLog(() -> "Created");
-
leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
+
leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
- sockWriter.sendMessage(leftMsg);
+ sockWriter.sendMessage(leftMsg);
- rootSpan.addLog(() -> "Sent").end();
+ rootSpan.addLog(() -> "Sent").end();
+ }
+ else
+ leaveLatch.countDown();
}
- else
- leaveLatch.countDown();
- }
- else if (msg == SPI_RECONNECT) {
- if (state == CONNECTED) {
- if (reconnector != null) {
- reconnector.cancel();
- reconnector.join();
+ else if (msg == SPI_RECONNECT) {
+ if (state == CONNECTED) {
+ if (reconnector != null) {
+ reconnector.cancel();
+ reconnector.join();
- reconnector = null;
- }
+ reconnector = null;
+ }
- sockWriter.forceLeave();
- sockReader.forceStopRead();
+ sockWriter.forceLeave();
+ sockReader.forceStopRead();
- currSock = null;
+ currSock = null;
- queue.clear();
+ queue.clear();
- onDisconnected();
+ onDisconnected();
- UUID newId = UUID.randomUUID();
+ UUID newId = UUID.randomUUID();
- U.quietAndWarn(log, "Local node will try to
reconnect to cluster with new id due " +
- "to network problems [newId=" + newId +
- ", prevId=" + locNode.id() +
- ", locNode=" + locNode + ']');
+ U.quietAndWarn(log, "Local node will try to
reconnect to cluster with new id due " +
+ "to network problems [newId=" + newId +
+ ", prevId=" + locNode.id() +
+ ", locNode=" + locNode + ']');
- locNode.onClientDisconnected(newId);
+ locNode.onClientDisconnected(newId);
- throttleClientReconnect();
+ throttleClientReconnect();
- tryJoin();
+ tryJoin();
+ }
}
- }
- else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
-
((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
- TcpDiscoveryNodeFailedMessage msg0 =
(TcpDiscoveryNodeFailedMessage)msg;
+ else if (msg instanceof TcpDiscoveryNodeFailedMessage
&&
+
((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
+ TcpDiscoveryNodeFailedMessage msg0 =
(TcpDiscoveryNodeFailedMessage)msg;
- assert msg0.force() : msg0;
+ assert msg0.force() : msg0;
- forceFailMsg = msg0;
- }
- else if (msg instanceof SocketClosedMessage) {
- if (((SocketClosedMessage)msg).sock == currSock) {
- Socket sock = currSock.sock;
+ forceFailMsg = msg0;
+ }
+ else if (msg instanceof SocketClosedMessage) {
+ if (((SocketClosedMessage)msg).sock == currSock) {
+ Socket sock = currSock.sock;
- InetSocketAddress prevAddr = new
InetSocketAddress(sock.getInetAddress(), sock.getPort());
+ InetSocketAddress prevAddr = new
InetSocketAddress(sock.getInetAddress(), sock.getPort());
- currSock = null;
+ currSock = null;
- boolean join = joinLatch.getCount() > 0;
+ boolean join = joinLatch.getCount() > 0;
- if (spi.getSpiContext().isStopping() || state ==
SEGMENTED) {
- leaveLatch.countDown();
+ if (spi.getSpiContext().isStopping() || state
== SEGMENTED) {
+ leaveLatch.countDown();
- if (join) {
- joinError(new IgniteSpiException("Failed
to connect to cluster: socket closed."));
+ if (join) {
+ joinError(new
IgniteSpiException("Failed to connect to cluster: socket closed."));
- break;
- }
- }
- else {
- if (forceFailMsg != null) {
- if (log.isDebugEnabled()) {
- log.debug("Connection closed, local
node received force fail message, " +
- "will not try to restore
connection");
+ break;
}
-
- queue.addFirst(SPI_RECONNECT_FAILED);
}
else {
- if (log.isDebugEnabled())
- log.debug("Connection closed, will try
to restore connection.");
+ if (forceFailMsg != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Connection closed,
local node received force fail message, " +
+ "will not try to restore
connection");
+ }
- assert reconnector == null;
+ queue.addFirst(SPI_RECONNECT_FAILED);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Connection closed, will
try to restore connection.");
+
+ assert reconnector == null;
- reconnector = new Reconnector(join,
prevAddr);
- reconnector.start();
+ reconnector = new Reconnector(join,
prevAddr);
+ reconnector.start();
+ }
}
}
}
- }
- else if (msg == SPI_RECONNECT_FAILED) {
- if (reconnector != null) {
- reconnector.cancel();
- reconnector.join();
-
- reconnector = null;
- }
- else
- assert forceFailMsg != null;
+ else if (msg == SPI_RECONNECT_FAILED) {
+ if (reconnector != null) {
+ reconnector.cancel();
+ reconnector.join();
- if (spi.isClientReconnectDisabled()) {
- if (state != SEGMENTED && state != STOPPED) {
- if (forceFailMsg != null) {
- U.quietAndWarn(log, "Local node was
dropped from cluster due to network problems " +
- "[nodeInitiatedFail=" +
forceFailMsg.creatorNodeId() +
- ", msg=" + forceFailMsg.warning() +
']');
- }
+ reconnector = null;
+ }
+ else
+ assert forceFailMsg != null;
+
+ if (spi.isClientReconnectDisabled()) {
+ if (state != SEGMENTED && state != STOPPED) {
+ if (forceFailMsg != null) {
+ U.quietAndWarn(log, "Local node was
dropped from cluster due to network problems " +
+ "[nodeInitiatedFail=" +
forceFailMsg.creatorNodeId() +
+ ", msg=" + forceFailMsg.warning()
+ ']');
+ }
- if (log.isDebugEnabled()) {
- log.debug("Failed to restore closed
connection, reconnect disabled, " +
- "local node segmented
[networkTimeout=" + spi.netTimeout + ']');
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to restore closed
connection, reconnect disabled, " +
+ "local node segmented
[networkTimeout=" + spi.netTimeout + ']');
+ }
- state = SEGMENTED;
+ state = SEGMENTED;
- notifyDiscovery(
- EVT_NODE_SEGMENTED, topVer, locNode,
allVisibleNodes(), null);
- }
- }
- else {
- if (state == STARTING || state == CONNECTED) {
- if (log.isDebugEnabled()) {
- log.debug("Failed to restore closed
connection, will try to reconnect " +
- "[networkTimeout=" + spi.netTimeout +
- ", joinTimeout=" + spi.joinTimeout +
- ", failMsg=" + forceFailMsg + ']');
+ notifyDiscovery(
+ EVT_NODE_SEGMENTED, topVer, locNode,
allVisibleNodes(), null);
}
-
- onDisconnected();
}
+ else {
+ if (state == STARTING || state == CONNECTED) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to restore closed
connection, will try to reconnect " +
+ "[networkTimeout=" +
spi.netTimeout +
+ ", joinTimeout=" + spi.joinTimeout
+
+ ", failMsg=" + forceFailMsg + ']');
+ }
- UUID newId = UUID.randomUUID();
+ onDisconnected();
+ }
- if (forceFailMsg != null) {
- long delay =
IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY,
- DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY);
+ UUID newId = UUID.randomUUID();
- if (delay > 0) {
- U.quietAndWarn(log, "Local node was
dropped from cluster due to network problems, " +
- "will try to reconnect with new id
after " + delay + "ms (reconnect delay " +
- "can be changed using
IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " +
- "property) [" +
- "newId=" + newId +
- ", prevId=" + locNode.id() +
- ", locNode=" + locNode +
- ", nodeInitiatedFail=" +
forceFailMsg.creatorNodeId() +
- ", msg=" + forceFailMsg.warning() +
']');
+ if (forceFailMsg != null) {
+ long delay =
IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY,
+
DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY);
+
+ if (delay > 0) {
+ U.quietAndWarn(log, "Local node was
dropped from cluster due to network problems, " +
+ "will try to reconnect with new id
after " + delay + "ms (reconnect delay " +
+ "can be changed using
IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " +
+ "property) [" +
+ "newId=" + newId +
+ ", prevId=" + locNode.id() +
+ ", locNode=" + locNode +
+ ", nodeInitiatedFail=" +
forceFailMsg.creatorNodeId() +
+ ", msg=" + forceFailMsg.warning()
+ ']');
+
+ Thread.sleep(delay);
+ }
+ else {
+ U.quietAndWarn(log, "Local node was
dropped from cluster due to network problems, " +
+ "will try to reconnect with new id
[" +
+ "newId=" + newId +
+ ", prevId=" + locNode.id() +
+ ", locNode=" + locNode +
+ ", nodeInitiatedFail=" +
forceFailMsg.creatorNodeId() +
+ ", msg=" + forceFailMsg.warning()
+ ']');
+ }
- Thread.sleep(delay);
+ forceFailMsg = null;
}
- else {
- U.quietAndWarn(log, "Local node was
dropped from cluster due to network problems, " +
- "will try to reconnect with new id [" +
- "newId=" + newId +
- ", prevId=" + locNode.id() +
- ", locNode=" + locNode +
- ", nodeInitiatedFail=" +
forceFailMsg.creatorNodeId() +
- ", msg=" + forceFailMsg.warning() +
']');
+ else if (log.isInfoEnabled()) {
+ log.info("Client node disconnected from
cluster, will try to reconnect with new id " +
+ "[newId=" + newId + ", prevId=" +
locNode.id() + ", locNode=" + locNode + ']');
}
- forceFailMsg = null;
- }
- else if (log.isInfoEnabled()) {
- log.info("Client node disconnected from
cluster, will try to reconnect with new id " +
- "[newId=" + newId + ", prevId=" +
locNode.id() + ", locNode=" + locNode + ']');
- }
+ locNode.onClientDisconnected(newId);
- locNode.onClientDisconnected(newId);
-
- tryJoin();
+ tryJoin();
+ }
}
- }
- else {
- TcpDiscoveryAbstractMessage discoMsg =
(TcpDiscoveryAbstractMessage)msg;
-
- if (joining()) {
- IgniteSpiException err = null;
+ else {
+ if (joining()) {
+ IgniteSpiException err = null;
- if (discoMsg instanceof
TcpDiscoveryDuplicateIdMessage)
- err =
spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
- else if (discoMsg instanceof
TcpDiscoveryAuthFailedMessage)
- err =
spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
- //TODO:
https://issues.apache.org/jira/browse/IGNITE-9829
- else if (discoMsg instanceof
TcpDiscoveryCheckFailedMessage)
- err =
spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
+ if (dm instanceof
TcpDiscoveryDuplicateIdMessage)
+ err =
spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
+ else if (dm instanceof
TcpDiscoveryAuthFailedMessage)
+ err =
spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
+ //TODO:
https://issues.apache.org/jira/browse/IGNITE-9829
+ else if (dm instanceof
TcpDiscoveryCheckFailedMessage)
+ err =
spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
- if (err != null) {
- if (state == DISCONNECTED) {
- U.error(log, "Failed to reconnect, segment
local node.", err);
+ if (err != null) {
+ if (state == DISCONNECTED) {
+ U.error(log, "Failed to reconnect,
segment local node.", err);
- state = SEGMENTED;
+ state = SEGMENTED;
- notifyDiscovery(
- EVT_NODE_SEGMENTED, topVer, locNode,
allVisibleNodes(), null);
- }
- else
- joinError(err);
+ notifyDiscovery(
+ EVT_NODE_SEGMENTED, topVer,
locNode, allVisibleNodes(), null);
+ }
+ else
+ joinError(err);
- cancel();
+ cancel();
- break;
+ break;
+ }
}
- }
-
processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg);
+ processDiscoveryMessage(dm);
+ }
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 82c012c2a1a..a8a4f2f0a47 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -95,6 +95,8 @@ import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage;
import
org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
+import
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -3046,8 +3048,10 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- if (msg instanceof TraceableMessage) {
- TraceableMessage tMsg = (TraceableMessage)msg;
+ if (!fromSocket)
+ msg.opCtxMsg =
DistributedOperationContextManager.instance().collectDistributedAttributes();
+
+ if (msg instanceof TraceableMessage tMsg) {
// If we read this message from socket.
if (fromSocket)
@@ -3173,11 +3177,8 @@ class ServerImpl extends TcpDiscoveryImpl {
task.run();
}
- /** {@inheritDoc} */
- @Override protected void processMessage(TcpDiscoveryAbstractMessage
msg) {
- if (msg == WAKEUP)
- return;
-
+ /** */
+ private void processMessage0(TcpDiscoveryAbstractMessage msg) {
notifiedDiscovery.set(false);
if (msg instanceof TraceableMessage) {
@@ -3315,6 +3316,16 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ /** {@inheritDoc} */
+ @Override protected void processMessage(TcpDiscoveryAbstractMessage
msg) {
+ if (msg == WAKEUP)
+ return;
+
+ try (Scope ignored =
DistributedOperationContextManager.instance().restoreDistributedAttributes(msg.opCtxMsg))
{
+ processMessage0(msg);
+ }
+ }
+
/**
* Processes authentication failed message.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java
index f23e36f200d..d76279fb280 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java
@@ -52,7 +52,6 @@ public class InetSocketAddressMessage extends
InetAddressMessage {
return port;
}
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(InetSocketAddressMessage.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 7a97763c36b..60c38866034 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.internal.DistributedOperationContextMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -76,6 +77,11 @@ public abstract class TcpDiscoveryAbstractMessage implements
Message {
@Order(4)
Set<UUID> failedNodes;
+ /** Operation context attributes message. */
+ @GridToStringInclude
+ @Order(5)
+ public @Nullable DistributedOperationContextMessage opCtxMsg;
+
/**
* Default no-arg constructor for {@link Externalizable} interface.
*/
@@ -100,6 +106,7 @@ public abstract class TcpDiscoveryAbstractMessage
implements Message {
verifierNodeId = msg.verifierNodeId;
topVer = msg.topVer;
flags = msg.flags;
+ opCtxMsg = msg.opCtxMsg;
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
index 9de906b2729..bdf84b743ad 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.thread.context;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -36,8 +37,13 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
@@ -48,6 +54,7 @@ import
org.apache.ignite.internal.thread.pool.IgniteStripedExecutor;
import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor;
import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.queue.IgniteAsyncObjectHandler;
import org.apache.ignite.internal.util.worker.queue.IgniteDelayedObjectHandler;
@@ -56,6 +63,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteThread;
import org.junit.Test;
@@ -64,6 +72,7 @@ import org.springframework.lang.NonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** */
public class OperationContextAttributesTest extends GridCommonAbstractTest {
@@ -98,6 +107,8 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
@Override protected void afterTest() throws Exception {
super.afterTest();
+ stopAllGrids();
+
if (poolToShutdownAfterTest != null)
poolToShutdownAfterTest.shutdownNow();
@@ -808,6 +819,98 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
}
}
+ /** */
+ @Test
+ public void testSendAttributesByDiscovery() throws Exception {
+ byte attrId1 = 0;
+ byte attrId2 =
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+
+ InetSocketAddressMessage dfltDistAttr1Val = new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
+ GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+
+ // Local attribute 1.
+ OperationContextAttribute.newInstance(1000);
+
+ // Distributed attribute 1.
+ OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
DistributedOperationContextManager.instance()
+ .createDistributedAttribute(attrId1, dfltDistAttr1Val);
+
+ // Local attribute 2.
+ OperationContextAttribute.newInstance("locaAttr2");
+
+ // Distributed attribute 2.
+ OperationContextAttribute<GridCacheVersion> dAttr2 =
DistributedOperationContextManager.instance()
+ .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
+
+ startGrids(2);
+ startClientGrid(2);
+
+ CountDownLatch coordLatch = new CountDownLatch(3);
+ CountDownLatch srvrLatch = new CountDownLatch(3);
+ CountDownLatch clientLatch = new CountDownLatch(3);
+
+ InetSocketAddressMessage valToSend1 = new
InetSocketAddressMessage(dfltDistAttr1Val.address(), 443);
+ GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2);
+
+ for (int i = 0; i < G.allGrids().size(); ++i) {
+ int i0 = i;
+
+ grid(i).context().discovery().setCustomEventListener(
+ DynamicCacheChangeBatch.class, new CustomEventListener<>() {
+ @Override public void
onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
+ DynamicCacheChangeBatch msg) {
+
+ InetSocketAddressMessage receivedVal1 =
OperationContext.get(dAttr1);
+ GridCacheVersion receivedVal2 =
OperationContext.get(dAttr2);
+
+ assertNotNull(receivedVal1);
+ assertNotNull(receivedVal2);
+
+ assertFalse(dfltDistAttr1Val.port() ==
receivedVal1.port());
+ assertEquals(receivedVal1.port(), valToSend1.port());
+ assertEquals(receivedVal1.address(),
valToSend1.address());
+
+ assertFalse(dfltDistrAttr2Val.equals(receivedVal2));
+ assertTrue(valToSend2.equals(receivedVal2));
+
+ if (grid(i0).localNode().isClient())
+ clientLatch.countDown();
+ else if (grid(i0).localNode().order() == 1)
+ coordLatch.countDown();
+ else
+ srvrLatch.countDown();
+ }
+ });
+ }
+
+ // Send from the coordinator.
+ try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2,
valToSend2)) {
+ grid(0).createCache(defaultCacheConfiguration());
+ }
+
+ assertTrue(waitForCondition(() -> coordLatch.getCount() == 2,
getTestTimeout()));
+ assertTrue(waitForCondition(() -> srvrLatch.getCount() == 2,
getTestTimeout()));
+ assertTrue(waitForCondition(() -> clientLatch.getCount() == 2,
getTestTimeout()));
+
+ // Send from a server.
+ try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2,
valToSend2)) {
+ grid(1).destroyCache(DEFAULT_CACHE_NAME);
+ }
+
+ assertTrue(waitForCondition(() -> coordLatch.getCount() == 1,
getTestTimeout()));
+ assertTrue(waitForCondition(() -> srvrLatch.getCount() == 1,
getTestTimeout()));
+ assertTrue(waitForCondition(() -> clientLatch.getCount() == 1,
getTestTimeout()));
+
+ // Send from a client.
+ try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2,
valToSend2)) {
+ grid(2).createCache(defaultCacheConfiguration());
+ }
+
+ assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
+ assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
+ assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
+ }
+
/** */
private void doContextAwareExecutorServiceTest(ExecutorService pool)
throws Exception {
CountDownLatch poolUnblockedLatch = blockPool(pool);
@@ -923,9 +1026,8 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
/** */
static void assertAllCreatedChecksPassed() throws Exception {
- for (AttributeValueChecker check : CHECKS) {
+ for (AttributeValueChecker check : CHECKS)
check.get(5_000, MILLISECONDS);
- }
}
/** */