This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 15a4383367 IGNITE-22083 Sql. Invalidate fragment mapping cache when the mapped node has left the cluster (#3637) 15a4383367 is described below commit 15a4383367bd832729f19e447889f6ce3491a695 Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com> AuthorDate: Mon Apr 22 15:57:38 2024 +0300 IGNITE-22083 Sql. Invalidate fragment mapping cache when the mapped node has left the cluster (#3637) --- .../engine/exec/mapping/MappingServiceImpl.java | 53 ++++++++++++++++------ .../exec/mapping/MappingServiceImplTest.java | 28 ++++++++++-- 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java index 13bf05cb35..7c28ac249e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java @@ -30,7 +30,9 @@ import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.stream.Collectors; @@ -152,10 +154,10 @@ public class MappingServiceImpl implements MappingService, LogicalTopologyEventL return val; }); - return cacheValue.mappedFragments.thenApply(mappedFragments -> applyPartitionPruning(mappedFragments, parameters)); + return cacheValue.mappedFragments.thenApply(mappedFragments -> applyPartitionPruning(mappedFragments.fragments, parameters)); } - private CompletableFuture<List<MappedFragment>> mapFragments(MappingContext context, FragmentsTemplate template) { + private CompletableFuture<MappedFragments> mapFragments(MappingContext context, FragmentsTemplate template) { IdGenerator idGenerator = new IdGenerator(template.nextId); List<Fragment> fragments = new ArrayList<>(template.fragments); @@ -242,7 +244,8 @@ public class MappingServiceImpl implements MappingService, LogicalTopologyEventL throw new IgniteInternalException(Sql.MAPPING_ERR, "Unable to map query: " + ex.getMessage(), ex); } - List<MappedFragment> result = new ArrayList<>(fragmentsToMap.size()); + List<MappedFragment> mappedFragmentsList = new ArrayList<>(fragmentsToMap.size()); + Set<String> targetNodes = new HashSet<>(); for (Fragment fragment : fragmentsToMap) { FragmentMapping mapping = mappingByFragmentId.get(fragment.fragmentId()); @@ -264,18 +267,20 @@ public class MappingServiceImpl implements MappingService, LogicalTopologyEventL sourcesByExchangeId.put(exchangeId, allSourcesByExchangeId.get(exchangeId)); } - result.add( - new MappedFragment( - fragment, - mapping.groups(), - sourcesByExchangeId, - targetGroup, - null - ) + MappedFragment mappedFragment = new MappedFragment( + fragment, + mapping.groups(), + sourcesByExchangeId, + targetGroup, + null ); + + mappedFragmentsList.add(mappedFragment); + + targetNodes.addAll(mappedFragment.nodes()); } - return result; + return new MappedFragments(mappedFragmentsList, targetNodes); }); } @@ -287,6 +292,15 @@ public class MappingServiceImpl implements MappingService, LogicalTopologyEventL @Override public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) { topologyHolder.update(newTopology); + + mappingsCache.removeIfValue(value -> { + if (value.mappedFragments.isDone()) { + return value.mappedFragments.join().nodes.contains(leftNode.name()); + } + + // Invalidate non-completed mappings to reduce the chance of getting stale value + return true; + }); } @Override @@ -412,12 +426,23 @@ public class MappingServiceImpl implements MappingService, LogicalTopologyEventL } } + /** Wraps list of mapped fragments with target node names. */ + private static class MappedFragments { + final List<MappedFragment> fragments; + final Set<String> nodes; + + MappedFragments(List<MappedFragment> fragments, Set<String> nodes) { + this.fragments = fragments; + this.nodes = nodes; + } + } + private static class MappingsCacheValue { private final long topVer; private final IntSet tableIds; - private final CompletableFuture<List<MappedFragment>> mappedFragments; + private final CompletableFuture<MappedFragments> mappedFragments; - MappingsCacheValue(long topVer, IntSet tableIds, CompletableFuture<List<MappedFragment>> mappedFragments) { + MappingsCacheValue(long topVer, IntSet tableIds, CompletableFuture<MappedFragments> mappedFragments) { this.topVer = topVer; this.tableIds = tableIds; this.mappedFragments = mappedFragments; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java index fa89d462f3..4979f14057 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java @@ -194,15 +194,35 @@ public class MappingServiceImplTest extends BaseIgniteAbstractTest { assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS))); assertSame(sysViewMapping, await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS))); - mappingService.onNodeLeft(Mockito.mock(LogicalNode.class), - new LogicalTopologySnapshot(2, logicalNodes("NODE"))); - // Plan with tables only must not be invalidated. + LogicalNode newNode = Mockito.mock(LogicalNode.class); + Mockito.when(newNode.name()).thenReturn("NODE2"); + + mappingService.onNodeJoined(Mockito.mock(LogicalNode.class), + new LogicalTopologySnapshot(3, logicalNodes("NODE", "NODE1", "NODE2"))); + + // Plan with tables only must not be invalidated on node join. assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS))); verifyNoMoreInteractions(targetProvider); // Plan with system views must be invalidated. assertNotSame(sysViewMapping, await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS))); - verify(targetProvider, times(3)).forTable(any(), any()); + + mappingService.onNodeLeft(newNode, + new LogicalTopologySnapshot(3, logicalNodes("NODE", "NODE1"))); + + // Plan with tables that don't include a left node should not be invalidated. + assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS))); + + LogicalNode node1 = Mockito.mock(LogicalNode.class); + Mockito.when(node1.name()).thenReturn("NODE1"); + + mappingService.onNodeLeft(node1, + new LogicalTopologySnapshot(3, logicalNodes("NODE"))); + + // Plan with tables that include left node must be invalidated. + assertNotSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS))); + + verify(targetProvider, times(4)).forTable(any(), any()); verify(targetProvider, times(2)).forSystemView(any(), any()); }