This is an automated email from the ASF dual-hosted git repository.
petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7a61b2d7f6b IGNITE-28808 Restricted distributed Operation Context
attribute registration after node started (#13275)
7a61b2d7f6b is described below
commit 7a61b2d7f6b31e945f4532403d634d80d4802fdc
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri Jun 26 09:08:03 2026 +0300
IGNITE-28808 Restricted distributed Operation Context attribute
registration after node started (#13275)
---
.../ignite/internal/CoreMessagesProvider.java | 2 +-
.../apache/ignite/internal/GridKernalContext.java | 10 +-
.../ignite/internal/GridKernalContextImpl.java | 6 +
.../org/apache/ignite/internal/IgniteKernal.java | 13 ++
...xtMessage.java => OperationContextMessage.java} | 10 +-
.../managers/communication/GridIoManager.java | 5 +-
.../managers/communication/GridIoMessage.java | 6 +-
.../wal/reader/StandaloneGridKernalContext.java | 9 +
...anager.java => OperationContextDispatcher.java} | 73 ++++---
.../ignite/spi/discovery/tcp/ClientImpl.java | 6 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 5 +-
.../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java | 6 +
.../tcp/messages/TcpDiscoveryAbstractMessage.java | 4 +-
.../context/OperationContextAttributesTest.java | 213 ++++++++++++---------
14 files changed, 214 insertions(+), 154 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index 936eef8bf39..f40b14826cc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -687,7 +687,7 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
// [13400 - 13500]: Operation context messages.
msgIdx = 13400;
- withNoSchema(DistributedOperationContextMessage.class);
+ withNoSchema(OperationContextMessage.class);
// [13500 - 13600]: Rolling Upgrade messages.
msgIdx = 13500;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 744a844f88e..cab4d2a8146 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -78,6 +78,7 @@ import
org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.worker.WorkersRegistry;
@@ -211,12 +212,19 @@ public interface GridKernalContext extends
Iterable<GridComponent> {
public MaintenanceRegistry maintenanceRegistry();
/**
- * Gets core message factoy.
+ * Gets core message factory.
*
* @return Core message factory.
*/
public MessageFactory messageFactory();
+ /**
+ * Gets the distributed operation context dispatcher.
+ *
+ * @return The distributed operation context dispatcher.
+ */
+ public OperationContextDispatcher operationContextDispatcher();
+
/**
* Gets transformation processor.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 5d779302d1f..fb3dcdb3412 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -101,6 +101,7 @@ import
org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
import org.apache.ignite.internal.thread.pool.IgniteForkJoinPool;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
@@ -697,6 +698,11 @@ public class GridKernalContextImpl implements
GridKernalContext, Externalizable
return grid.messageFactory();
}
+ /** {@inheritDoc} */
+ @Override public OperationContextDispatcher operationContextDispatcher() {
+ return grid.operationContextDispatcher();
+ }
+
/** {@inheritDoc} */
@Override public CacheObjectTransformerProcessor transformer() {
return transProc;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8e92f8118fc..063774e7b13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -174,6 +174,7 @@ import
org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.suggestions.JvmConfigurationSuggestions;
import org.apache.ignite.internal.suggestions.OsConfigurationSuggestions;
import org.apache.ignite.internal.systemview.ConfigurationViewWalker;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
import org.apache.ignite.internal.util.TimeBag;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -444,6 +445,9 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
/** Core message factory. */
private MessageFactory msgFactory;
+ /** Distributed operation context dispatcher. */
+ private OperationContextDispatcher operationCtxDispatcher;
+
/**
* No-arg constructor is required by externalization.
*/
@@ -930,6 +934,8 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
longJVMPauseDetector
);
+ operationCtxDispatcher = new OperationContextDispatcher();
+
startProcessor(new DiagnosticProcessor(ctx));
mBeansMgr = new IgniteMBeansManager(this);
@@ -1152,6 +1158,8 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
// All components exept Discovery are started, time to check if
maintenance is still needed.
mntcProc.prepareAndExecuteMaintenance();
+ operationCtxDispatcher.finishRegistration();
+
gw.writeLock();
try {
@@ -3059,6 +3067,11 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
return msgFactory;
}
+ /** @return Distributed operation context dispatcher. */
+ OperationContextDispatcher operationContextDispatcher() {
+ return operationCtxDispatcher;
+ }
+
/**
* Method is responsible for handling the {@link
EventType#EVT_CLIENT_NODE_DISCONNECTED} event. Notify all the
* GridComponents that the such even has been occurred (e.g. if the local
client node disconnected from the cluster
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/OperationContextMessage.java
similarity index 79%
rename from
modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/OperationContextMessage.java
index 42d5c7eda85..9dc9fdb82cc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/OperationContextMessage.java
@@ -17,16 +17,16 @@
package org.apache.ignite.internal;
-import
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
import org.apache.ignite.plugin.extensions.communication.Message;
/**
- * Transport for {@link OperationContext} distributed attributes.
+ * Message for {@link OperationContext} distributed attributes.
*
- * @see DistributedOperationContextManager
+ * @see OperationContextDispatcher
*/
-public class DistributedOperationContextMessage implements Message {
+public class OperationContextMessage implements Message {
/** Values of operation context attributes. */
@Order(0)
public Message[] vals;
@@ -36,7 +36,7 @@ public class DistributedOperationContextMessage implements
Message {
public byte idBitmap;
/** Empty constructor for serialization purposes. */
- public DistributedOperationContextMessage() {
+ public OperationContextMessage() {
// No-op.
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bdbc375fc9f..a83e6ae8e7d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -102,7 +102,6 @@ import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanTags;
-import
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -461,7 +460,7 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
try {
GridIoMessage msg0 = (GridIoMessage)msg;
- try (Scope ignored =
DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg))
{
+ try (Scope ignored =
ctx.operationContextDispatcher().restoreDistributedAttributes(msg0.opCtxMsg)) {
onMessage0(nodeId, msg0, msgC);
}
}
@@ -2055,7 +2054,7 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
else
res = new GridIoMessage(plc, topic, msg, ordered, timeout,
skipOnTimeout);
- res.opCtxMsg =
DistributedOperationContextManager.instance().collectDistributedAttributes();
+ res.opCtxMsg =
ctx.operationContextDispatcher().collectDistributedAttributes();
return res;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index cb09122415d..1865868a71a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.managers.communication;
-import org.apache.ignite.internal.DistributedOperationContextMessage;
import org.apache.ignite.internal.ExecutorAwareMessage;
import org.apache.ignite.internal.GridTopicMessage;
+import org.apache.ignite.internal.OperationContextMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -65,10 +65,10 @@ public class GridIoMessage implements Message,
SpanTransport {
@Order(6)
byte[] span;
- /** Effective operation context attributes. */
+ /** Effective operation context attributes to propagate. */
@Order(7)
@GridToStringInclude
- public @Nullable DistributedOperationContextMessage opCtxMsg;
+ public @Nullable OperationContextMessage opCtxMsg;
/**
* Default constructor.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 48525f5d697..dd0d90b4167 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -103,6 +103,7 @@ import
org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -176,6 +177,9 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
/** Marshaller. */
private final BinaryMarshaller marsh;
+ /** Operation context dispacther. */
+ private final OperationContextDispatcher opCtxDispatcher = new
OperationContextDispatcher();
+
/**
* @param log Logger.
* @param ft Node file tree.
@@ -454,6 +458,11 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
return null;
}
+ /** {@inheritDoc} */
+ @Override public OperationContextDispatcher operationContextDispatcher() {
+ return opCtxDispatcher;
+ }
+
/** {@inheritDoc} */
@Override public CacheObjectTransformerProcessor transformer() {
return transProc;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java
similarity index 57%
rename from
modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java
index 0ef065b7f77..a0669286576 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java
@@ -21,7 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.DistributedOperationContextMessage;
+import org.apache.ignite.internal.OperationContextMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
@@ -37,71 +37,62 @@ import org.jetbrains.annotations.Nullable;
* {@link OperationContextAttribute} instance that is consistent across all
cluster nodes.</p>
*
* <p>To enable propagation of an {@link OperationContextAttribute} value
across cluster nodes, the
- * attribute must be created using the {@link
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be registered with the {@link
#registerDistributedAttribute(int, OperationContextAttribute)} method.
*
- * <p> Note, that the maximum number of distributed attribute instances that
can be created is currently limited to
- * {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
+ * <p> Note, that the maximum number of distributed attributes to register is
currently limited to
+ * {@link #MAX_ATTRS_CNT} for implementation reasons.</p>
*
* @see OperationContext
- * @see DistributedOperationContextMessage
+ * @see OperationContextMessage
*/
-public class DistributedOperationContextManager {
- /** */
- private static final DistributedOperationContextManager INSTANCE = new
DistributedOperationContextManager();
-
+public class OperationContextDispatcher {
/** Maximal number of supported distributed attributes. */
- static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
+ static final byte MAX_ATTRS_CNT = Byte.SIZE;
/** Registered distributed attributes by their cluster-wide id. */
- private final Map<Byte, OperationContextAttribute<Message>> attrs = new
ConcurrentSkipListMap<>();
+ private final Map<Byte, OperationContextAttribute<? extends Message>>
attrs = new ConcurrentSkipListMap<>();
- /** */
- public static DistributedOperationContextManager instance() {
- return INSTANCE;
- }
+ /** Whether the registration of new distributed attributes is allowed. */
+ private volatile boolean regFinished;
/**
- * Creates a new {@link OperationContext} attribute with the specified
distributed ID and initial value.
+ * Registers an attribute of {@link OperationContext} with the specified
distributed ID.
*
* <p>The distributed ID is used to consistently identify the attribute
across all nodes in the cluster.
- * It must be unique, and its value must be in the range from {@code 0}
(inclusive) to {@code Byte.SIZE} (exclusive).</p>
- *
- * <p>The value of the created attribute is automatically captured and
propagated between cluster nodes
- * during message transmission.</p>
+ * It must be unique, and its value must be in the range [{@code 0} :
{@code Byte.SIZE}).</p>
*
- * @see OperationContextAttribute#newInstance(Object)
+ * <p>Registered attribute value is automatically captured and propagated
between cluster nodes
+ * during the messages transmission.</p>
*/
- public <T extends Message> OperationContextAttribute<T>
createDistributedAttribute(byte id, @Nullable T initVal) {
- assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed
attributed id [id=" + id + ']';
+ public <T extends Message> void registerDistributedAttribute(int id,
OperationContextAttribute<T> attr) {
+ if (regFinished)
+ throw new IgniteException("Initialization of distributed operation
context attributes has already finished.");
- return (OperationContextAttribute<T>)attrs.compute(id, (id0, attr0) ->
{
- if (attr0 != null)
- throw new IgniteException("Duplicated distributed attribute id
[id=" + id + ']');
+ assert id >= 0 && id < MAX_ATTRS_CNT : "Invalid distributed attributed
id [id=" + id + ']';
- return OperationContextAttribute.newInstance(initVal);
- });
+ if (attrs.putIfAbsent((byte)id, attr) != null)
+ throw new IgniteException("Duplicated distributed attribute id
[id=" + id + ']');
}
/**
- * Collects the values of all distributed {@link
OperationContextAttribute}s registered by this manager in a format
- * suitable for transmission between cluster nodes.
+ * Collects the values of all distributed {@link
OperationContextAttribute}s registered by this dispatcher.
*
* @see OperationContext#get(OperationContextAttribute)
*/
- public @Nullable DistributedOperationContextMessage
collectDistributedAttributes() {
- DistributedOperationContextMessage res = null;
+ public @Nullable OperationContextMessage collectDistributedAttributes() {
+ OperationContextMessage res = null;
List<Message> vals = null;
- for (Map.Entry<Byte, OperationContextAttribute<Message>> e :
attrs.entrySet()) {
+ for (Map.Entry<Byte, OperationContextAttribute<? extends Message>> e :
attrs.entrySet()) {
OperationContextAttribute<? extends Message> attr = e.getValue();
Message curVal = OperationContext.get(attr);
if (curVal != attr.initialValue()) {
if (res == null) {
- res = new DistributedOperationContextMessage();
+ res = new OperationContextMessage();
- vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2);
+ vals = new ArrayList<>(MAX_ATTRS_CNT / 2);
}
byte mask = (byte)(1 << e.getKey());
@@ -120,13 +111,13 @@ public class DistributedOperationContextManager {
}
/** Restores distributed {@link OperationContextAttribute} values received
from a remote node. */
- public Scope restoreDistributedAttributes(@Nullable
DistributedOperationContextMessage msg) {
+ public Scope restoreDistributedAttributes(@Nullable
OperationContextMessage msg) {
if (msg == null)
return Scope.NOOP_SCOPE;
assert msg.idBitmap != 0;
assert !F.isEmpty(msg.vals);
- assert msg.vals.length <= MAX_DISTRIBUTED_ATTR_CNT;
+ assert msg.vals.length <= MAX_ATTRS_CNT;
OperationContext.ContextUpdater updater =
OperationContext.ContextUpdater.create();
@@ -136,7 +127,7 @@ public class DistributedOperationContextManager {
while ((msg.idBitmap & (1 << maskIdx)) == 0)
++maskIdx;
- OperationContextAttribute<Message> attr = attrs.get(maskIdx++);
+ OperationContextAttribute<Message> attr =
(OperationContextAttribute<Message>)attrs.get(maskIdx++);
assert attr != null;
@@ -146,8 +137,8 @@ public class DistributedOperationContextManager {
return updater.apply();
}
- /** For testing purposes mostly. */
- void clear() {
- attrs.clear();
+ /** Restricts further registration of distributed attributes. */
+ public void finishRegistration() {
+ regFinished = true;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index ccf622be6ea..86a4f190026 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -70,7 +70,6 @@ import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage;
import
org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
-import
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -1312,7 +1311,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void sendMessage(TcpDiscoveryAbstractMessage msg) {
- msg.opCtxMsg =
DistributedOperationContextManager.instance().collectDistributedAttributes();
+ msg.opCtxMsg =
operationCtxDispatcher.collectDistributedAttributes();
synchronized (mux) {
queue.add(msg);
@@ -1765,8 +1764,7 @@ class ClientImpl extends TcpDiscoveryImpl {
? (TcpDiscoveryAbstractMessage)msg
: null;
- try (Scope ignored =
DistributedOperationContextManager.instance()
- .restoreDistributedAttributes(dm == null ? null :
dm.opCtxMsg)) {
+ try (Scope ignored =
operationCtxDispatcher.restoreDistributedAttributes(dm == null ? null :
dm.opCtxMsg)) {
if (msg instanceof JoinTimeout) {
int joinCnt0 = ((JoinTimeout)msg).joinCnt;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 75c24745be3..9af1a4200d6 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -95,7 +95,6 @@ import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage;
import
org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
-import
org.apache.ignite.internal.thread.context.DistributedOperationContextManager;
import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
@@ -3019,7 +3018,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (!fromSocket)
- msg.opCtxMsg =
DistributedOperationContextManager.instance().collectDistributedAttributes();
+ msg.opCtxMsg =
operationCtxDispatcher.collectDistributedAttributes();
if (msg instanceof TraceableMessage tMsg) {
@@ -3291,7 +3290,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg == WAKEUP)
return;
- try (Scope ignored =
DistributedOperationContextManager.instance().restoreDistributedAttributes(msg.opCtxMsg))
{
+ try (Scope ignored =
operationCtxDispatcher.restoreDistributedAttributes(msg.opCtxMsg)) {
processMessage0(msg);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 789f3d0adb1..25ad69c147f 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage;
import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.Tracing;
+import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -139,6 +140,9 @@ abstract class TcpDiscoveryImpl {
/** Tracing. */
protected Tracing tracing;
+ /** Distributed operation context dispatcher. */
+ protected final OperationContextDispatcher operationCtxDispatcher;
+
/**
* @param spi Adapter.
*/
@@ -151,6 +155,8 @@ abstract class TcpDiscoveryImpl {
tracing = ((IgniteEx)spi.ignite()).context().tracing();
else
tracing = new NoopTracing();
+
+ operationCtxDispatcher =
((IgniteEx)spi.ignite()).context().operationContextDispatcher();
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 60c38866034..5f09498060e 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -21,7 +21,7 @@ import java.io.Externalizable;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
-import org.apache.ignite.internal.DistributedOperationContextMessage;
+import org.apache.ignite.internal.OperationContextMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -80,7 +80,7 @@ public abstract class TcpDiscoveryAbstractMessage implements
Message {
/** Operation context attributes message. */
@GridToStringInclude
@Order(5)
- public @Nullable DistributedOperationContextMessage opCtxMsg;
+ public @Nullable OperationContextMessage opCtxMsg;
/**
* Default no-arg constructor for {@link Externalizable} interface.
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
index 3571371f2eb..b4003de3bcf 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
@@ -22,10 +22,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
@@ -40,6 +42,8 @@ import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -59,6 +63,7 @@ import
org.apache.ignite.internal.thread.pool.IgniteScheduledThreadPoolExecutor;
import org.apache.ignite.internal.thread.pool.IgniteStripedExecutor;
import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor;
import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
+import org.apache.ignite.internal.util.GridByteArrayList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -69,6 +74,9 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteThread;
@@ -77,6 +85,7 @@ import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -101,6 +110,9 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
/** */
private int beforeTestReservedAttrIds;
+ /** */
+ private @Nullable PluginProvider pluginProvider;
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
@@ -121,8 +133,16 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
// Releases attribute IDs reserved during the test.
OperationContextAttribute.ID_GEN.set(beforeTestReservedAttrIds);
+ }
- DistributedOperationContextManager.instance().clear();
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (pluginProvider != null)
+ cfg.setPluginProviders(pluginProvider);
+
+ return cfg;
}
/** */
@@ -831,36 +851,82 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
/** */
@Test
public void testSendAttributesByDiscovery() throws Exception {
- byte attrId1 = 0;
- byte attrId2 =
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+ doTestOperationContextAttributesPropagation(true);
+ }
- InetSocketAddressMessage dfltDistAttr1Val = new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
- GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+ /** */
+ @Test
+ public void testSendAttributesByCommunication() throws Exception {
+ doTestOperationContextAttributesPropagation(false);
+ }
- // Local attribute 1.
- OperationContextAttribute.newInstance(1000);
+ /** */
+ private void doTestOperationContextAttributesPropagation(boolean
discovery) throws Exception {
+ OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
+ OperationContextAttribute.newInstance(new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80));
- // Distributed attribute 1.
- OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
DistributedOperationContextManager.instance()
- .createDistributedAttribute(attrId1, dfltDistAttr1Val);
+ OperationContextAttribute<GridCacheVersion> dAttr2 =
OperationContextAttribute.newInstance(new GridCacheVersion(1, 1, 1));
- // Local attribute 2.
- OperationContextAttribute.newInstance("locaAttr2");
+ OperationContextAttribute<GridByteArrayList> otherTestAttr =
OperationContextAttribute.newInstance(new GridByteArrayList());
+
+ pluginProvider = new AbstractTestPluginProvider() {
+ @Override public String name() {
+ return "TestDistributedOperationContextAttributesRegistrator";
+ }
+
+ @Override public void start(PluginContext ctx) {
+ GridKernalContext kctx = ((IgniteEx)ctx.grid()).context();
+
+
kctx.operationContextDispatcher().registerDistributedAttribute(0, dAttr1);
+
+
kctx.operationContextDispatcher().registerDistributedAttribute(OperationContextDispatcher.MAX_ATTRS_CNT
- 1, dAttr2);
- // Distributed attribute 2.
- OperationContextAttribute<GridCacheVersion> dAttr2 =
DistributedOperationContextManager.instance()
- .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
+ assertThrowsAnyCause(
+ log,
+ () -> {
+
kctx.operationContextDispatcher().registerDistributedAttribute(0,
otherTestAttr);
+ return null;
+
+ }, IgniteException.class,
+ "Duplicated distributed attribute id"
+ );
+ }
+ };
+
+ // Local attribute 1.
+ OperationContextAttribute.newInstance(1000);
startGrids(2);
startClientGrid(2);
- CountDownLatch coordLatch = new CountDownLatch(3);
- CountDownLatch srvrLatch = new CountDownLatch(3);
- CountDownLatch clientLatch = new CountDownLatch(3);
+ assertThrows(
+ null,
+ () ->
grid(0).context().operationContextDispatcher().registerDistributedAttribute(1,
null),
+ IgniteException.class,
+ "Initialization of distributed operation context attributes has
already finished"
+ );
- InetSocketAddressMessage valToSend1 = new
InetSocketAddressMessage(dfltDistAttr1Val.address(), 443);
+ // Local attribute 2.
+ OperationContextAttribute.newInstance("locaAttr2");
+
+ InetSocketAddressMessage valToSend1 = new
InetSocketAddressMessage(dAttr1.initialValue().address(), 443);
GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2);
+ if (discovery)
+
doTestOperationContextAttributesPropagationThroughDiscovery(dAttr1, valToSend1,
dAttr2, valToSend2);
+ else
+
doTestOperationContextAttributesPropagationThroughCommunication(dAttr1,
valToSend1, dAttr2, valToSend2);
+ }
+
+ /** */
+ private void doTestOperationContextAttributesPropagationThroughDiscovery(
+ OperationContextAttribute<InetSocketAddressMessage> dAttr1,
+ InetSocketAddressMessage valToSend1,
+ OperationContextAttribute<GridCacheVersion> dAttr2,
+ GridCacheVersion valToSend2
+ ) throws Exception {
+ Set<Integer> checkedNodes = ConcurrentHashMap.newKeySet();
+
for (int i = 0; i < G.allGrids().size(); ++i) {
int i0 = i;
@@ -872,22 +938,12 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
InetSocketAddressMessage receivedVal1 =
OperationContext.get(dAttr1);
GridCacheVersion receivedVal2 =
OperationContext.get(dAttr2);
- assertNotNull(receivedVal1);
- assertNotNull(receivedVal2);
-
- assertFalse(dfltDistAttr1Val.port() ==
receivedVal1.port());
- assertEquals(receivedVal1.port(), valToSend1.port());
- assertEquals(receivedVal1.address(),
valToSend1.address());
+ assertTrue(receivedVal1 != null && valToSend1.port()
== receivedVal1.port());
+ assertTrue(receivedVal1 != null &&
valToSend1.address().equals(receivedVal1.address()));
- assertFalse(dfltDistrAttr2Val.equals(receivedVal2));
- assertTrue(valToSend2.equals(receivedVal2));
+ assertEquals(valToSend2, receivedVal2);
- if (grid(i0).localNode().isClient())
- clientLatch.countDown();
- else if (grid(i0).localNode().order() == 1)
- coordLatch.countDown();
- else
- srvrLatch.countDown();
+ checkedNodes.add(i0);
}
});
}
@@ -897,58 +953,33 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
grid(0).createCache(defaultCacheConfiguration());
}
- assertTrue(waitForCondition(() -> coordLatch.getCount() == 2,
getTestTimeout()));
- assertTrue(waitForCondition(() -> srvrLatch.getCount() == 2,
getTestTimeout()));
- assertTrue(waitForCondition(() -> clientLatch.getCount() == 2,
getTestTimeout()));
+ assertTrue(waitForCondition(() -> checkedNodes.size() == 3,
getTestTimeout(), 50));
+ checkedNodes.clear();
// Send from a server.
try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2,
valToSend2)) {
grid(1).destroyCache(DEFAULT_CACHE_NAME);
}
- assertTrue(waitForCondition(() -> coordLatch.getCount() == 1,
getTestTimeout()));
- assertTrue(waitForCondition(() -> srvrLatch.getCount() == 1,
getTestTimeout()));
- assertTrue(waitForCondition(() -> clientLatch.getCount() == 1,
getTestTimeout()));
+ assertTrue(waitForCondition(() -> checkedNodes.size() == 3,
getTestTimeout(), 50));
+ checkedNodes.clear();
// Send from a client.
try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2,
valToSend2)) {
grid(2).createCache(defaultCacheConfiguration());
}
- assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
- assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
- assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
+ assertTrue(waitForCondition(() -> checkedNodes.size() == 3,
getTestTimeout(), 50));
+ checkedNodes.clear();
}
/** */
- @Test
- public void testSendAttributesByCommunication() throws Exception {
- byte attrId1 = 0;
- byte attrId2 =
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
-
- InetSocketAddressMessage dfltDistrAttr1Val = new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
- GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
-
- // Local attribute 1.
- OperationContextAttribute.newInstance(1000);
-
- // Distributed attribute 1.
- OperationContextAttribute<InetSocketAddressMessage> dAttr0 =
DistributedOperationContextManager.instance()
- .createDistributedAttribute(attrId1, dfltDistrAttr1Val);
-
- // Local attribute 2.
- OperationContextAttribute.newInstance("locaAttr2");
-
- // Distributed attribute 2.
- OperationContextAttribute<GridCacheVersion> dAttr1 =
DistributedOperationContextManager.instance()
- .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
-
- startGrids(2);
- startClientGrid(2);
-
- InetSocketAddressMessage valToSend0 = new
InetSocketAddressMessage(dfltDistrAttr1Val.address(), 443);
- GridCacheVersion valToSend1 = new GridCacheVersion(2, 2, 2);
-
+ private void
doTestOperationContextAttributesPropagationThroughCommunication(
+ OperationContextAttribute<InetSocketAddressMessage> dAttr1,
+ InetSocketAddressMessage valToSend1,
+ OperationContextAttribute<GridCacheVersion> dAttr2,
+ GridCacheVersion valToSend2
+ ) throws Exception {
// Coordinator -> Server, Coordinator -> Client, Server -> Client,
Client -> Server, etc.
for (int fromIdx = 0; fromIdx < 3; ++fromIdx) {
for (int toIdx = 0; toIdx < 3; ++toIdx) {
@@ -956,13 +987,13 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
continue;
// One value.
- try (Scope ignored = OperationContext.set(dAttr0, valToSend0))
{
- checkOperationContextCommunicationTransmission(fromIdx,
toIdx, dAttr0, null);
+ try (Scope ignored = OperationContext.set(dAttr1, valToSend1))
{
+ checkOperationContextCommunicationTransmission(fromIdx,
toIdx, dAttr1, null);
}
// A couple of values.
- try (Scope ignored = OperationContext.set(dAttr0, valToSend0,
dAttr1, valToSend1)) {
- checkOperationContextCommunicationTransmission(fromIdx,
toIdx, dAttr0, dAttr1);
+ try (Scope ignored = OperationContext.set(dAttr1, valToSend1,
dAttr2, valToSend2)) {
+ checkOperationContextCommunicationTransmission(fromIdx,
toIdx, dAttr1, dAttr2);
}
}
}
@@ -972,44 +1003,44 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
private void checkOperationContextCommunicationTransmission(
int gridFromIdx,
int gridToIdx,
- OperationContextAttribute<InetSocketAddressMessage> attr0,
- @Nullable OperationContextAttribute<GridCacheVersion> attr1
- ) throws InterruptedException {
- Ignite from = grid(gridFromIdx);
- Ignite to = grid(gridToIdx);
+ OperationContextAttribute<InetSocketAddressMessage> attr1,
+ @Nullable OperationContextAttribute<GridCacheVersion> attr2
+ ) throws Exception {
+ IgniteEx from = grid(gridFromIdx);
+ IgniteEx to = grid(gridToIdx);
CountDownLatch rcvLatch = new CountDownLatch(2);
- InetSocketAddressMessage expVal0 = OperationContext.get(attr0);
- GridCacheVersion expVal1 = attr1 == null ? null :
OperationContext.get(attr1);
+ InetSocketAddressMessage expVal1 = OperationContext.get(attr1);
+ GridCacheVersion expVal2 = attr2 == null ? null :
OperationContext.get(attr2);
GridMessageListener lsnr = new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte plc)
{
if (msg instanceof IgniteIoTestMessage &&
((IgniteIoTestMessage)msg).request()) {
- InetSocketAddressMessage receivedVal0 =
OperationContext.get(attr0);
- GridCacheVersion receivedVal1 = attr1 == null ? null :
OperationContext.get(attr1);
+ InetSocketAddressMessage receivedVal1 =
OperationContext.get(attr1);
+ GridCacheVersion receivedVal2 = attr2 == null ? null :
OperationContext.get(attr2);
- assertTrue(receivedVal0 != null && expVal0.port() ==
receivedVal0.port());
- assertTrue(receivedVal0 != null &&
expVal0.address().equals(receivedVal0.address()));
+ assertTrue(receivedVal1 != null && expVal1.port() ==
receivedVal1.port());
+ assertTrue(receivedVal1 != null &&
expVal1.address().equals(receivedVal1.address()));
- if (attr1 != null)
- assertEquals(expVal1, receivedVal1);
+ if (attr2 != null)
+ assertEquals(expVal2, receivedVal2);
rcvLatch.countDown();
}
}
};
-
((IgniteEx)to).context().io().addMessageListener(GridTopic.TOPIC_IO_TEST, lsnr);
+ to.context().io().addMessageListener(GridTopic.TOPIC_IO_TEST, lsnr);
try {
- ((IgniteEx)from).context().io().sendIoTest(node(from, to), null,
false);
- ((IgniteEx)from).context().io().sendIoTest(node(from, to), null,
true);
+ from.context().io().sendIoTest(node(from, to), null, false);
+ from.context().io().sendIoTest(node(from, to), null, true);
assertTrue(rcvLatch.await(getTestTimeout(), MILLISECONDS));
}
finally {
-
assertTrue(((IgniteEx)to).context().io().removeMessageListener(GridTopic.TOPIC_IO_TEST,
lsnr));
+
assertTrue(to.context().io().removeMessageListener(GridTopic.TOPIC_IO_TEST,
lsnr));
}
}