This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 15a4383367 IGNITE-22083 Sql. Invalidate fragment mapping cache when 
the mapped node has left the cluster (#3637)
15a4383367 is described below

commit 15a4383367bd832729f19e447889f6ce3491a695
Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com>
AuthorDate: Mon Apr 22 15:57:38 2024 +0300

    IGNITE-22083 Sql. Invalidate fragment mapping cache when the mapped node 
has left the cluster (#3637)
---
 .../engine/exec/mapping/MappingServiceImpl.java    | 53 ++++++++++++++++------
 .../exec/mapping/MappingServiceImplTest.java       | 28 ++++++++++--
 2 files changed, 63 insertions(+), 18 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index 13bf05cb35..7c28ac249e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -30,7 +30,9 @@ import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
@@ -152,10 +154,10 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
             return val;
         });
 
-        return cacheValue.mappedFragments.thenApply(mappedFragments -> 
applyPartitionPruning(mappedFragments, parameters));
+        return cacheValue.mappedFragments.thenApply(mappedFragments -> 
applyPartitionPruning(mappedFragments.fragments, parameters));
     }
 
-    private CompletableFuture<List<MappedFragment>> 
mapFragments(MappingContext context, FragmentsTemplate template) {
+    private CompletableFuture<MappedFragments> mapFragments(MappingContext 
context, FragmentsTemplate template) {
         IdGenerator idGenerator = new IdGenerator(template.nextId);
         List<Fragment> fragments = new ArrayList<>(template.fragments);
 
@@ -242,7 +244,8 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
                         throw new IgniteInternalException(Sql.MAPPING_ERR, 
"Unable to map query: " + ex.getMessage(), ex);
                     }
 
-                    List<MappedFragment> result = new 
ArrayList<>(fragmentsToMap.size());
+                    List<MappedFragment> mappedFragmentsList = new 
ArrayList<>(fragmentsToMap.size());
+                    Set<String> targetNodes = new HashSet<>();
                     for (Fragment fragment : fragmentsToMap) {
                         FragmentMapping mapping = 
mappingByFragmentId.get(fragment.fragmentId());
 
@@ -264,18 +267,20 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
                             sourcesByExchangeId.put(exchangeId, 
allSourcesByExchangeId.get(exchangeId));
                         }
 
-                        result.add(
-                                new MappedFragment(
-                                        fragment,
-                                        mapping.groups(),
-                                        sourcesByExchangeId,
-                                        targetGroup,
-                                        null
-                                )
+                        MappedFragment mappedFragment = new MappedFragment(
+                                fragment,
+                                mapping.groups(),
+                                sourcesByExchangeId,
+                                targetGroup,
+                                null
                         );
+
+                        mappedFragmentsList.add(mappedFragment);
+
+                        targetNodes.addAll(mappedFragment.nodes());
                     }
 
-                    return result;
+                    return new MappedFragments(mappedFragmentsList, 
targetNodes);
                 });
     }
 
@@ -287,6 +292,15 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
     @Override
     public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot 
newTopology) {
         topologyHolder.update(newTopology);
+
+        mappingsCache.removeIfValue(value -> {
+            if (value.mappedFragments.isDone()) {
+                return 
value.mappedFragments.join().nodes.contains(leftNode.name());
+            }
+
+            // Invalidate non-completed mappings to reduce the chance of 
getting stale value
+            return true;
+        });
     }
 
     @Override
@@ -412,12 +426,23 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
         }
     }
 
+    /** Wraps list of mapped fragments with target node names. */
+    private static class MappedFragments {
+        final List<MappedFragment> fragments;
+        final Set<String> nodes;
+
+        MappedFragments(List<MappedFragment> fragments, Set<String> nodes) {
+            this.fragments = fragments;
+            this.nodes = nodes;
+        }
+    }
+
     private static class MappingsCacheValue {
         private final long topVer;
         private final IntSet tableIds;
-        private final CompletableFuture<List<MappedFragment>> mappedFragments;
+        private final CompletableFuture<MappedFragments> mappedFragments;
 
-        MappingsCacheValue(long topVer, IntSet tableIds, 
CompletableFuture<List<MappedFragment>> mappedFragments) {
+        MappingsCacheValue(long topVer, IntSet tableIds, 
CompletableFuture<MappedFragments> mappedFragments) {
             this.topVer = topVer;
             this.tableIds = tableIds;
             this.mappedFragments = mappedFragments;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index fa89d462f3..4979f14057 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -194,15 +194,35 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
         assertSame(sysViewMapping, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
 
-        mappingService.onNodeLeft(Mockito.mock(LogicalNode.class),
-                new LogicalTopologySnapshot(2, logicalNodes("NODE")));
-        // Plan with tables only must not be invalidated.
+        LogicalNode newNode = Mockito.mock(LogicalNode.class);
+        Mockito.when(newNode.name()).thenReturn("NODE2");
+
+        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
+                new LogicalTopologySnapshot(3, logicalNodes("NODE", "NODE1", 
"NODE2")));
+
+        // Plan with tables only must not be invalidated on node join.
         assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
         verifyNoMoreInteractions(targetProvider);
 
         // Plan with system views must be invalidated.
         assertNotSame(sysViewMapping, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
-        verify(targetProvider, times(3)).forTable(any(), any());
+
+        mappingService.onNodeLeft(newNode,
+                new LogicalTopologySnapshot(3, logicalNodes("NODE", "NODE1")));
+
+        // Plan with tables that don't include a left node should not be 
invalidated.
+        assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
+
+        LogicalNode node1 = Mockito.mock(LogicalNode.class);
+        Mockito.when(node1.name()).thenReturn("NODE1");
+
+        mappingService.onNodeLeft(node1,
+                new LogicalTopologySnapshot(3, logicalNodes("NODE")));
+
+        // Plan with tables that include left node must be invalidated.
+        assertNotSame(tableOnlyMapping, await(mappingService.map(PLAN, 
PARAMS)));
+
+        verify(targetProvider, times(4)).forTable(any(), any());
         verify(targetProvider, times(2)).forSystemView(any(), any());
     }
 

Reply via email to