ibessonov commented on a change in pull request #9527:
URL: https://github.com/apache/ignite/pull/9527#discussion_r737190009



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
##########
@@ -669,6 +669,8 @@ private void onDiscovery0(DiscoveryNotification 
notification) {
                     else if (customMsg instanceof 
ChangeGlobalStateFinishMessage) {
                         
ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
 
+                        ctx.cluster().onLocalJoin();

Review comment:
       This method should be renamed now

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -329,10 +339,51 @@ public void updateTag(String newTag) throws 
IgniteCheckedException {
      * </ul>
      */
     public void onLocalJoin() {
-        cluster.setId(locClusterId != null ? locClusterId : UUID.randomUUID());
+        Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+        List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+        rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+        @Nullable Collection<BaselineNode> bltNodes = 
cluster.currentBaselineTopology();
+
+        if (F.isEmpty(bltNodes)) {
+            log.info("Baseline node collection is empty.");
+
+            return;
+        }
+
+        @Nullable UUID first = null;
+
+        Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+        for (ClusterNode node : rmtNodes0) {
+            if (F.contains(srvIds, node.consistentId())) {
+                first = node.id();
+
+                break;
+            }
+        }
+
+        ClusterNode locNode = ctx.config().getDiscoverySpi().getLocalNode();
+
+        if (first == locNode.id() || locClusterId != null) {
+            cluster.setId(locClusterId != null ? locClusterId : 
UUID.randomUUID());
+
+            cluster.setTag(locClusterTag != null ? locClusterTag :
+                ClusterTagGenerator.generateTag());
 
-        cluster.setTag(locClusterTag != null ? locClusterTag :
-            ClusterTagGenerator.generateTag());
+            ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(), 
cluster.tag());
+
+            if (log.isInfoEnabled())
+                log.info("Writing cluster ID and tag to metastorage on ready 
for write " + idAndTag);
+
+            try {
+                metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);

Review comment:
       So, please correct me if I'm wrong:
    - This will happen on every activation.
    - Only the baseline node with the lowest nodeId can propagate this value.
    - It is implied that "locClusterTag" and "locClusterId" match setting from 
the cluster if they are present.
   
   Let's hope that none of external instruments will be affected by such 
change. 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -329,10 +339,51 @@ public void updateTag(String newTag) throws 
IgniteCheckedException {
      * </ul>
      */
     public void onLocalJoin() {
-        cluster.setId(locClusterId != null ? locClusterId : UUID.randomUUID());
+        Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+        List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+        rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+        @Nullable Collection<BaselineNode> bltNodes = 
cluster.currentBaselineTopology();
+
+        if (F.isEmpty(bltNodes)) {
+            log.info("Baseline node collection is empty.");
+
+            return;
+        }
+
+        @Nullable UUID first = null;
+
+        Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+        for (ClusterNode node : rmtNodes0) {
+            if (F.contains(srvIds, node.consistentId())) {
+                first = node.id();
+
+                break;
+            }
+        }
+
+        ClusterNode locNode = ctx.config().getDiscoverySpi().getLocalNode();
+
+        if (first == locNode.id() || locClusterId != null) {
+            cluster.setId(locClusterId != null ? locClusterId : 
UUID.randomUUID());
+
+            cluster.setTag(locClusterTag != null ? locClusterTag :
+                ClusterTagGenerator.generateTag());
 
-        cluster.setTag(locClusterTag != null ? locClusterTag :
-            ClusterTagGenerator.generateTag());
+            ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(), 
cluster.tag());
+
+            if (log.isInfoEnabled())
+                log.info("Writing cluster ID and tag to metastorage on ready 
for write " + idAndTag);
+
+            try {
+                metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
+            }
+            catch (IgniteCheckedException e) {
+                ctx.failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));

Review comment:
       What if this node actually fails? It feels like we'll be having a 
cluster without tag&id. Can "unconditional" "compareAndSet" save us from such 
situation?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
##########
@@ -406,84 +417,180 @@ public void testDeactivateActivate() throws Exception {
     public void testOptimizedWriteTwice() throws Exception {
         IgniteEx igniteEx = startGrid(0);
 
-        igniteEx.cluster().active(true);
+        igniteEx.cluster().state(ClusterState.ACTIVE);
 
         metastorage(0).write("key1", "value1");
 
-        assertEquals(1, metastorage(0).getUpdatesCount() - 
initialUpdatesCount);
+        int expUpdatesCnt = 1;
+
+        assertTrue("initialUpdatesCount=" + initialUpdatesCount + ", upd=" + 
metastorage(0).getUpdatesCount(),
+            waitForCondition(() ->
+            (expUpdatesCnt == metastorage(0).getUpdatesCount() - 
initialUpdatesCount), 10_000));
 
         metastorage(0).write("key2", "value2");
 
-        assertEquals(2, metastorage(0).getUpdatesCount() - 
initialUpdatesCount);
+        assertEquals(expUpdatesCnt + 1, metastorage(0).getUpdatesCount() - 
initialUpdatesCount);
 
         metastorage(0).write("key1", "value1");
 
-        assertEquals(2, metastorage(0).getUpdatesCount() - 
initialUpdatesCount);
+        assertEquals(expUpdatesCnt + 1, metastorage(0).getUpdatesCount() - 
initialUpdatesCount);
     }
 
     /** */
     @Test
     public void testClient() throws Exception {
         IgniteEx igniteEx = startGrid(0);
 
-        igniteEx.cluster().active(true);
+        igniteEx.cluster().state(ClusterState.ACTIVE);
 
-        metastorage(0).write("key0", "value0");
+        checkStored(metastorage(0), metastorage(0), "key0", "value0");
 
         startClientGrid(1);
 
         AtomicInteger clientLsnrUpdatesCnt = new AtomicInteger();
 
-        assertEquals(1, metastorage(1).getUpdatesCount() - 
initialUpdatesCount);
+        int expUpdatesCnt = 1;
+
+        assertEquals("initialUpdatesCount=" + initialUpdatesCount + ", upd=" + 
metastorage(1).getUpdatesCount(),
+            expUpdatesCnt, metastorage(1).getUpdatesCount() - 
initialUpdatesCount);
 
         assertEquals("value0", metastorage(1).read("key0"));
 
         metastorage(1).listen(key -> true, (key, oldVal, newVal) -> 
clientLsnrUpdatesCnt.incrementAndGet());
 
-        metastorage(1).write("key1", "value1");
+        checkStored(metastorage(1), metastorage(1), "key1", "value1");
+
+        checkStored(metastorage(1), metastorage(0), "key1", "value1");
 
         assertEquals(1, clientLsnrUpdatesCnt.get());
+    }
 
-        assertEquals("value1", metastorage(1).read("key1"));
+    /** */
+    protected void checkStoredWithPers(
+        DistributedMetaStorage msToStore,
+        IgniteEx instanceToCheck,
+        String key,
+        String value
+    ) throws IgniteCheckedException {
+        assertTrue(isPersistent());
 
-        assertEquals("value1", metastorage(0).read("key1"));
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final DistributedMetaStorageImpl distrMetaStore =
+            
(DistributedMetaStorageImpl)instanceToCheck.context().distributedMetastorage();
+
+        DmsDataWriterWorker worker = 
GridTestUtils.getFieldValue(distrMetaStore, "worker");
+
+        ReadWriteMetastorage metastorage = GridTestUtils.getFieldValue(worker, 
"metastorage");
+
+        assertNotNull(metastorage);
+
+        IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
+            try {
+                latch.await();
+
+                assertTrue(waitForCondition(() -> {
+                    try {
+                        AtomicBoolean contains = new AtomicBoolean(false);
+                        metastorage.iterate("", (k, v) -> {

Review comment:
       Why don't you call "read" (or "get", whatever it is called)?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -283,10 +287,16 @@ public boolean diagnosticEnabled() {
                 try {
                     ClusterIdAndTag idAndTag = new 
ClusterIdAndTag(cluster.id(), cluster.tag());
 
-                    if (log.isInfoEnabled())
-                        log.info("Writing cluster ID and tag to metastorage on 
ready for write " + idAndTag);
+                    if (log.isInfoEnabled()) {
+                        if (idAndTag.id() != null) {
+                            metastorage.writeAsync(CLUSTER_ID_TAG_KEY, 
idAndTag);

Review comment:
       I'm sorry, what is going on here? You only write to metastore if INFO 
level enabled in logs? That's not right

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
##########
@@ -65,12 +72,16 @@
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        startGrid(0);
+        IgniteEx ign = startGrid(0);
 
         // We have to start the second node and wait when it is started
         // to be sure that all async metastorage updates of the node_0 are 
completed.
         startGrid(1);
 
+        ign.cluster().state(ClusterState.ACTIVE);
+
+        waitForCondition(() -> (int)metastorage(0).getUpdatesCount() == 
(isPersistent() ? 4 : 2), 10_000);

Review comment:
       This is fine. I think you should just add a comment that will clarify 
exact keys that you mean here.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -329,10 +339,51 @@ public void updateTag(String newTag) throws 
IgniteCheckedException {
      * </ul>
      */
     public void onLocalJoin() {
-        cluster.setId(locClusterId != null ? locClusterId : UUID.randomUUID());
+        Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+        List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+        rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+        @Nullable Collection<BaselineNode> bltNodes = 
cluster.currentBaselineTopology();
+
+        if (F.isEmpty(bltNodes)) {
+            log.info("Baseline node collection is empty.");
+
+            return;
+        }
+
+        @Nullable UUID first = null;
+
+        Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+        for (ClusterNode node : rmtNodes0) {
+            if (F.contains(srvIds, node.consistentId())) {
+                first = node.id();
+
+                break;
+            }
+        }
+
+        ClusterNode locNode = ctx.config().getDiscoverySpi().getLocalNode();
+
+        if (first == locNode.id() || locClusterId != null) {
+            cluster.setId(locClusterId != null ? locClusterId : 
UUID.randomUUID());
+
+            cluster.setTag(locClusterTag != null ? locClusterTag :
+                ClusterTagGenerator.generateTag());
 
-        cluster.setTag(locClusterTag != null ? locClusterTag :
-            ClusterTagGenerator.generateTag());
+            ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(), 
cluster.tag());
+
+            if (log.isInfoEnabled())
+                log.info("Writing cluster ID and tag to metastorage on ready 
for write " + idAndTag);

Review comment:
       This should probably be rephrased, or maye we shouldn't even write it in 
the first place if tag&id are already known

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -489,21 +490,32 @@ public FilePageStoreManager(GridKernalContext ctx) {
 
         int grpId = MetaStorage.METASTORAGE_CACHE_ID;
 
-        if (!idxCacheStores.containsKey(grpId)) {
-            DataRegion dataRegion = 
cctx.database().dataRegion(GridCacheDatabaseSharedManager.METASTORE_DATA_REGION_NAME);
-            PageMetrics pageMetrics = 
dataRegion.metrics().cacheGrpPageMetrics(grpId);
+        DataRegion dataRegion = 
cctx.database().dataRegion(GridCacheDatabaseSharedManager.METASTORE_DATA_REGION_NAME);
 
-            CacheStoreHolder holder = initDir(
-                new File(storeWorkDir, MetaStorage.METASTORAGE_DIR_NAME),
-                grpId,
-                MetaStorage.METASTORAGE_PARTITIONS.size(),
-                pageMetrics,
-                false);
+        AtomicReference<IgniteCheckedException> initException = new 
AtomicReference<>();
 
-            CacheStoreHolder old = idxCacheStores.put(grpId, holder);
+        idxCacheStores.computeIfAbsent(grpId, k -> {
+                PageMetrics pageMetrics = 
dataRegion.metrics().cacheGrpPageMetrics(grpId);
 
-            assert old == null : "Non-null old store holder for metastorage";
-        }
+                CacheStoreHolder holder = null;
+                try {
+                    holder = initDir(
+                        new File(storeWorkDir, 
MetaStorage.METASTORAGE_DIR_NAME),
+                        grpId,
+                        MetaStorage.METASTORAGE_PARTITIONS.size(),
+                        pageMetrics,
+                        false);
+                }
+                catch (IgniteCheckedException e) {
+                    initException.set(e);
+                }
+
+                return holder;
+            }
+        );
+
+        if (initException.get() != null)
+            throw new IgniteCheckedException("Metastorage initialization 
error: ", initException.get());

Review comment:
       Can't you just cast "initException.get()" to IgniteCheckedException?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -72,6 +73,9 @@
     /** */
     private volatile CountDownLatch latch = new CountDownLatch(0);
 
+    /** For tests purpose only. */
+    private volatile Predicate<String> writeCondition;

Review comment:
       You can also add `@TestOnly` annotation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to