petrov-mg commented on code in PR #13275:
URL: https://github.com/apache/ignite/pull/13275#discussion_r3475787321


##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
  * {@link OperationContextAttribute} instance that is consistent across all 
cluster nodes.</p>
  *
  * <p>To enable propagation of an {@link OperationContextAttribute} value 
across cluster nodes, the
- * attribute must be created using the {@link 
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link 
#registerDistributedAttribute(byte, OperationContextAttribute)} method.

Review Comment:
   `must be created using` -> `must be registered via`



##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
  * {@link OperationContextAttribute} instance that is consistent across all 
cluster nodes.</p>
  *
  * <p>To enable propagation of an {@link OperationContextAttribute} value 
across cluster nodes, the
- * attribute must be created using the {@link 
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link 
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
  *
  * <p> Note, that the maximum number of distributed attribute instances that 
can be created is currently limited to
  * {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
  *
  * @see OperationContext
- * @see DistributedOperationContextMessage
+ * @see OperationContextMessage
  */
-public class DistributedOperationContextManager {
-    /** */
-    private static final DistributedOperationContextManager INSTANCE = new 
DistributedOperationContextManager();
-
+public class OperationContextDispatcher {
     /** Maximal number of supported distributed attributes. */
     static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
 
     /** Registered distributed attributes by their cluster-wide id. */
-    private final Map<Byte, OperationContextAttribute<Message>> attrs = new 
ConcurrentSkipListMap<>();
+    private final Map<Byte, OperationContextAttribute<? extends Message>> 
attrs = new ConcurrentSkipListMap<>();
 
-    /** */
-    public static DistributedOperationContextManager instance() {
-        return INSTANCE;
-    }
+    /** The initialization flag. */

Review Comment:
   `Whether the registration of new distributed attributes is allowed.`



##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
  * {@link OperationContextAttribute} instance that is consistent across all 
cluster nodes.</p>
  *
  * <p>To enable propagation of an {@link OperationContextAttribute} value 
across cluster nodes, the
- * attribute must be created using the {@link 
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link 
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
  *
  * <p> Note, that the maximum number of distributed attribute instances that 
can be created is currently limited to
  * {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
  *
  * @see OperationContext
- * @see DistributedOperationContextMessage
+ * @see OperationContextMessage
  */
-public class DistributedOperationContextManager {
-    /** */
-    private static final DistributedOperationContextManager INSTANCE = new 
DistributedOperationContextManager();
-
+public class OperationContextDispatcher {
     /** Maximal number of supported distributed attributes. */
     static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
 
     /** Registered distributed attributes by their cluster-wide id. */
-    private final Map<Byte, OperationContextAttribute<Message>> attrs = new 
ConcurrentSkipListMap<>();
+    private final Map<Byte, OperationContextAttribute<? extends Message>> 
attrs = new ConcurrentSkipListMap<>();
 
-    /** */
-    public static DistributedOperationContextManager instance() {
-        return INSTANCE;
-    }
+    /** The initialization flag. */
+    private volatile boolean initialized;
 
     /**
-     * Creates a new {@link OperationContext} attribute with the specified 
distributed ID and initial value.
+     * Registers an attribute of {@link OperationContext} with the specified 
distributed ID.
      *
      * <p>The distributed ID is used to consistently identify the attribute 
across all nodes in the cluster.
-     * It must be unique, and its value must be in the range from {@code 0} 
(inclusive) to {@code Byte.SIZE} (exclusive).</p>
+     * It must be unique, and its value must be in the range [{@code 0} : 
{@code Byte.SIZE}).</p>
      *
-     * <p>The value of the created attribute is automatically captured and 
propagated between cluster nodes
+     * <p>A value of the attribute is automatically captured and propagated 
between cluster nodes
      * during message transmission.</p>
-     *
-     * @see OperationContextAttribute#newInstance(Object)
      */
-    public <T extends Message> OperationContextAttribute<T> 
createDistributedAttribute(byte id, @Nullable T initVal) {
-        assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed 
attributed id [id=" + id + ']';
+    public <T extends Message> void registerDistributedAttribute(byte id, 
OperationContextAttribute<T> attr) {
+        if (initialized)
+            throw new IgniteException("Initialization of distributed operation 
context attributes has already finished.");
 
-        return (OperationContextAttribute<T>)attrs.compute(id, (id0, attr0) -> 
{
-            if (attr0 != null)
-                throw new IgniteException("Duplicated distributed attribute id 
[id=" + id + ']');
+        assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed 
attributed id [id=" + id + ']';
 
-            return OperationContextAttribute.newInstance(initVal);
-        });
+        if (attrs.putIfAbsent(id, attr) != null)
+            throw new IgniteException("Duplicated distributed attribute id 
[id=" + id + ']');
     }
 
     /**
-     * Collects the values of all distributed {@link 
OperationContextAttribute}s registered by this manager in a format
-     * suitable for transmission between cluster nodes.
+     * Collects the values of all distributed {@link 
OperationContextAttribute}s registered by this manager.

Review Comment:
   `registered by this manager` -> `registered by this dispatcher`



##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
  * {@link OperationContextAttribute} instance that is consistent across all 
cluster nodes.</p>
  *
  * <p>To enable propagation of an {@link OperationContextAttribute} value 
across cluster nodes, the
- * attribute must be created using the {@link 
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link 
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
  *
  * <p> Note, that the maximum number of distributed attribute instances that 
can be created is currently limited to
  * {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
  *
  * @see OperationContext
- * @see DistributedOperationContextMessage
+ * @see OperationContextMessage
  */
-public class DistributedOperationContextManager {
-    /** */
-    private static final DistributedOperationContextManager INSTANCE = new 
DistributedOperationContextManager();
-
+public class OperationContextDispatcher {
     /** Maximal number of supported distributed attributes. */
     static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
 
     /** Registered distributed attributes by their cluster-wide id. */
-    private final Map<Byte, OperationContextAttribute<Message>> attrs = new 
ConcurrentSkipListMap<>();
+    private final Map<Byte, OperationContextAttribute<? extends Message>> 
attrs = new ConcurrentSkipListMap<>();
 
-    /** */
-    public static DistributedOperationContextManager instance() {
-        return INSTANCE;
-    }
+    /** The initialization flag. */
+    private volatile boolean initialized;
 
     /**
-     * Creates a new {@link OperationContext} attribute with the specified 
distributed ID and initial value.
+     * Registers an attribute of {@link OperationContext} with the specified 
distributed ID.
      *
      * <p>The distributed ID is used to consistently identify the attribute 
across all nodes in the cluster.
-     * It must be unique, and its value must be in the range from {@code 0} 
(inclusive) to {@code Byte.SIZE} (exclusive).</p>
+     * It must be unique, and its value must be in the range [{@code 0} : 
{@code Byte.SIZE}).</p>
      *
-     * <p>The value of the created attribute is automatically captured and 
propagated between cluster nodes
+     * <p>A value of the attribute is automatically captured and propagated 
between cluster nodes
      * during message transmission.</p>
-     *
-     * @see OperationContextAttribute#newInstance(Object)
      */
-    public <T extends Message> OperationContextAttribute<T> 
createDistributedAttribute(byte id, @Nullable T initVal) {
-        assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed 
attributed id [id=" + id + ']';
+    public <T extends Message> void registerDistributedAttribute(byte id, 
OperationContextAttribute<T> attr) {
+        if (initialized)
+            throw new IgniteException("Initialization of distributed operation 
context attributes has already finished.");

Review Comment:
   Let's remove dot from the end of the error message.



##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
  * {@link OperationContextAttribute} instance that is consistent across all 
cluster nodes.</p>
  *
  * <p>To enable propagation of an {@link OperationContextAttribute} value 
across cluster nodes, the
- * attribute must be created using the {@link 
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link 
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
  *
  * <p> Note, that the maximum number of distributed attribute instances that 
can be created is currently limited to
  * {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
  *
  * @see OperationContext
- * @see DistributedOperationContextMessage
+ * @see OperationContextMessage
  */
-public class DistributedOperationContextManager {
-    /** */
-    private static final DistributedOperationContextManager INSTANCE = new 
DistributedOperationContextManager();
-
+public class OperationContextDispatcher {
     /** Maximal number of supported distributed attributes. */
     static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
 
     /** Registered distributed attributes by their cluster-wide id. */
-    private final Map<Byte, OperationContextAttribute<Message>> attrs = new 
ConcurrentSkipListMap<>();
+    private final Map<Byte, OperationContextAttribute<? extends Message>> 
attrs = new ConcurrentSkipListMap<>();
 
-    /** */
-    public static DistributedOperationContextManager instance() {
-        return INSTANCE;
-    }
+    /** The initialization flag. */
+    private volatile boolean initialized;
 
     /**
-     * Creates a new {@link OperationContext} attribute with the specified 
distributed ID and initial value.
+     * Registers an attribute of {@link OperationContext} with the specified 
distributed ID.
      *
      * <p>The distributed ID is used to consistently identify the attribute 
across all nodes in the cluster.
-     * It must be unique, and its value must be in the range from {@code 0} 
(inclusive) to {@code Byte.SIZE} (exclusive).</p>
+     * It must be unique, and its value must be in the range [{@code 0} : 
{@code Byte.SIZE}).</p>
      *
-     * <p>The value of the created attribute is automatically captured and 
propagated between cluster nodes
+     * <p>A value of the attribute is automatically captured and propagated 
between cluster nodes

Review Comment:
   `A value of the attribute` -> `A value of the registered attribute`



##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
  * {@link OperationContextAttribute} instance that is consistent across all 
cluster nodes.</p>
  *
  * <p>To enable propagation of an {@link OperationContextAttribute} value 
across cluster nodes, the
- * attribute must be created using the {@link 
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link 
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
  *
  * <p> Note, that the maximum number of distributed attribute instances that 
can be created is currently limited to

Review Comment:
   `can be created` -> `can be registered`



##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -146,8 +137,8 @@ public Scope restoreDistributedAttributes(@Nullable 
DistributedOperationContextM
         return updater.apply();
     }
 
-    /** For testing purposes mostly. */
-    void clear() {
-        attrs.clear();
+    /** Deprecated fuhrter filling of distributed attributes. */

Review Comment:
   Restricts further registration of distributed attributes.



##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -146,8 +137,8 @@ public Scope restoreDistributedAttributes(@Nullable 
DistributedOperationContextM
         return updater.apply();
     }
 
-    /** For testing purposes mostly. */
-    void clear() {
-        attrs.clear();
+    /** Deprecated fuhrter filling of distributed attributes. */
+    public void initialized() {

Review Comment:
   Let's rename it to something like `markInitialized`, `seal`, or 
`finishRegistration`, and rename the associated variable accordingly.



##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########
@@ -831,36 +847,74 @@ public void testContextAwareDelayQueue() throws Exception 
{
     /** */
     @Test
     public void testSendAttributesByDiscovery() throws Exception {
-        byte attrId1 = 0;
-        byte attrId2 = 
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+        doTestOperationContextAttributesPropagation(true);
+    }
 
-        InetSocketAddressMessage dfltDistAttr1Val = new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
-        GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+    /** */
+    @Test
+    public void testSendAttributesByCommunication() throws Exception {
+        doTestOperationContextAttributesPropagation(false);
+    }
+
+    /** */
+    private void doTestOperationContextAttributesPropagation(boolean 
discovery) throws Exception {
+        OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
+            OperationContextAttribute.newInstance(new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80));
+
+        OperationContextAttribute<GridCacheVersion> dAttr2 =
+            OperationContextAttribute.newInstance(new GridCacheVersion(1, 1, 
1));
+
+        pluginProvider = new AbstractTestPluginProvider() {

Review Comment:
   I suggest making this class static and declaring the attributes inside it
   Something like: 
   ```
    /** */
       static class TestIgniteComponent extends AbstractTestPluginProvider {
           /** */
           public static OperationContextAttribute<InetSocketAddressMessage> 
ADDR_ATTR = OperationContextAttribute.newInstance(
               new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 
80));
   
           /** */
           public static OperationContextAttribute<GridCacheVersion> 
CACHE_VER_ATTR = OperationContextAttribute.newInstance(
               new GridCacheVersion(1, 1, 1));
   
           /** {@inheritDoc} */
           @Override public String name() {
               return "TestDistributedOperationContextAttributesRegistrator";
           }
   
           /** {@inheritDoc} */
           @Override public void start(PluginContext ctx) {
               GridKernalContext kernalCtx = ((IgniteEx)ctx.grid()).context();
   
               
kernalCtx.operationContextDispatcher().registerDistributedAttribute((byte)0, 
ADDR_ATTR);
   
               
kernalCtx.operationContextDispatcher().registerDistributedAttribute(
                   (byte)(OperationContextDispatcher.MAX_DISTRIBUTED_ATTR_CNT - 
1),
                   CACHE_VER_ATTR
               );
           }
       }
   ```



##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########
@@ -872,15 +926,10 @@ public void testSendAttributesByDiscovery() throws 
Exception {
                         InetSocketAddressMessage receivedVal1 = 
OperationContext.get(dAttr1);
                         GridCacheVersion receivedVal2 = 
OperationContext.get(dAttr2);
 
-                        assertNotNull(receivedVal1);
-                        assertNotNull(receivedVal2);
-
-                        assertFalse(dfltDistAttr1Val.port() == 
receivedVal1.port());
-                        assertEquals(receivedVal1.port(), valToSend1.port());
-                        assertEquals(receivedVal1.address(), 
valToSend1.address());
+                        assertTrue(receivedVal1 != null && valToSend1.port() 
== receivedVal1.port());
+                        assertTrue(receivedVal1 != null && 
valToSend1.address().equals(receivedVal1.address()));
 
-                        assertFalse(dfltDistrAttr2Val.equals(receivedVal2));
-                        assertTrue(valToSend2.equals(receivedVal2));
+                        assertEquals(valToSend2, receivedVal2);
 
                         if (grid(i0).localNode().isClient())

Review Comment:
   Here we can just use ` checkedNodes.add(grid(i0));`



##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########
@@ -831,36 +847,74 @@ public void testContextAwareDelayQueue() throws Exception 
{
     /** */
     @Test
     public void testSendAttributesByDiscovery() throws Exception {
-        byte attrId1 = 0;
-        byte attrId2 = 
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+        doTestOperationContextAttributesPropagation(true);
+    }
 
-        InetSocketAddressMessage dfltDistAttr1Val = new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
-        GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+    /** */
+    @Test
+    public void testSendAttributesByCommunication() throws Exception {
+        doTestOperationContextAttributesPropagation(false);
+    }
+
+    /** */
+    private void doTestOperationContextAttributesPropagation(boolean 
discovery) throws Exception {
+        OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
+            OperationContextAttribute.newInstance(new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80));
+
+        OperationContextAttribute<GridCacheVersion> dAttr2 =
+            OperationContextAttribute.newInstance(new GridCacheVersion(1, 1, 
1));
+
+        pluginProvider = new AbstractTestPluginProvider() {
+            @Override public String name() {
+                return "TestDistributedOperationContextAttributesRegistrator";
+            }
+
+            @Override public void start(PluginContext ctx) {
+                
((IgniteEx)ctx.grid()).context().operationContextDispatcher().registerDistributedAttribute((byte)0,
 dAttr1);
+

Review Comment:
   Let's add a check to ensure that registering an attribute with the same 
identifier fails.



##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########


Review Comment:
   Here and places below - 
   ```
   assertTrue(waitForCondition(() -> checkedNodes.containsAll(G.allGrids()), 
getTestTimeout()));
   
   checkedNodes.clear();
   ```



##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########
@@ -831,36 +847,74 @@ public void testContextAwareDelayQueue() throws Exception 
{
     /** */
     @Test
     public void testSendAttributesByDiscovery() throws Exception {
-        byte attrId1 = 0;
-        byte attrId2 = 
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+        doTestOperationContextAttributesPropagation(true);
+    }
 
-        InetSocketAddressMessage dfltDistAttr1Val = new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
-        GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+    /** */
+    @Test
+    public void testSendAttributesByCommunication() throws Exception {
+        doTestOperationContextAttributesPropagation(false);
+    }
+
+    /** */
+    private void doTestOperationContextAttributesPropagation(boolean 
discovery) throws Exception {
+        OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
+            OperationContextAttribute.newInstance(new 
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80));
+
+        OperationContextAttribute<GridCacheVersion> dAttr2 =
+            OperationContextAttribute.newInstance(new GridCacheVersion(1, 1, 
1));
+
+        pluginProvider = new AbstractTestPluginProvider() {
+            @Override public String name() {
+                return "TestDistributedOperationContextAttributesRegistrator";
+            }
+
+            @Override public void start(PluginContext ctx) {
+                
((IgniteEx)ctx.grid()).context().operationContextDispatcher().registerDistributedAttribute((byte)0,
 dAttr1);
+
+                
((IgniteEx)ctx.grid()).context().operationContextDispatcher().registerDistributedAttribute(
+                    (byte)(OperationContextDispatcher.MAX_DISTRIBUTED_ATTR_CNT 
- 1),
+                    dAttr2
+                );
+            }
+        };
 
         // Local attribute 1.
         OperationContextAttribute.newInstance(1000);
 
-        // Distributed attribute 1.
-        OperationContextAttribute<InetSocketAddressMessage> dAttr1 = 
DistributedOperationContextManager.instance()
-            .createDistributedAttribute(attrId1, dfltDistAttr1Val);
+        startGrids(2);
+        startClientGrid(2);
+
+        assertThrows(
+            null,
+            () -> 
grid(0).context().operationContextDispatcher().registerDistributedAttribute((byte)1,
 null),
+            IgniteException.class,
+            "Initialization of distributed operation context attributes has 
already finished"
+        );
 
         // Local attribute 2.
         OperationContextAttribute.newInstance("locaAttr2");
 
-        // Distributed attribute 2.
-        OperationContextAttribute<GridCacheVersion> dAttr2 = 
DistributedOperationContextManager.instance()
-            .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
+        InetSocketAddressMessage valToSend1 = new 
InetSocketAddressMessage(dAttr1.initialValue().address(), 443);
+        GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2);
 
-        startGrids(2);
-        startClientGrid(2);
+        if (discovery)
+            
doTestOperationContextAttributesPropagationThroughDiscovery(dAttr1, valToSend1, 
dAttr2, valToSend2);
+        else
+            
doTestOperationContextAttributesPropagationThroughCommunication(dAttr1, 
valToSend1, dAttr2, valToSend2);
+    }
 
+    /** */
+    private void doTestOperationContextAttributesPropagationThroughDiscovery(
+        OperationContextAttribute<InetSocketAddressMessage> dAttr1,
+        InetSocketAddressMessage valToSend1,
+        OperationContextAttribute<GridCacheVersion> dAttr2,
+        GridCacheVersion valToSend2
+    ) throws Exception {
         CountDownLatch coordLatch = new CountDownLatch(3);

Review Comment:
   We can simplify this logic a bit - 
   
   Replace latches with ` Set<Ignite> checkedNodes = 
ConcurrentHashMap.newKeySet();`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to