This is an automated email from the ASF dual-hosted git repository. ascherbakov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new f55901d IGNITE-12935 Improved logging for historical rebalancing. - Fixes #7722. f55901d is described below commit f55901d3de2148227b102e5f5260bc637617f261 Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Tue Jul 7 09:41:32 2020 +0300 IGNITE-12935 Improved logging for historical rebalancing. - Fixes #7722. Signed-off-by: Aleksei Scherbakov <ascherba...@apache.org> --- .../preloader/GridDhtPartitionsExchangeFuture.java | 91 ++++++- .../dht/preloader/SupplyPartitionInfo.java | 99 ++++++++ .../GridCacheDatabaseSharedManager.java | 65 ++++- .../persistence/checkpoint/CheckpointHistory.java | 29 ++- .../persistence/checkpoint/ReservationReason.java | 50 ++++ .../db/wal/IgniteWalRebalanceLoggingTest.java | 275 +++++++++++++++++++++ .../ignite/testsuites/IgnitePdsMvccTestSuite2.java | 2 + .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + .../processors/client/IgniteDataStreamerTest.java | 2 + 9 files changed, 598 insertions(+), 18 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a34a098..21b3d26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -3349,8 +3349,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * * @param top Topology to assign. * @param resetOwners True if need to reset partition state considering of counter, false otherwise. + * @return Partitions supply info list. */ - private void assignPartitionStates(GridDhtPartitionTopology top, boolean resetOwners) { + private List<SupplyPartitionInfo> assignPartitionStates(GridDhtPartitionTopology top, boolean resetOwners) { Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>(); Map<Integer, TreeSet<Long>> varCntrs = new HashMap<>(); @@ -3424,10 +3425,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Set<Integer> haveHistory = new HashSet<>(); - assignHistoricalSuppliers(top, maxCntrs, varCntrs, haveHistory); + List<SupplyPartitionInfo> list = assignHistoricalSuppliers(top, maxCntrs, varCntrs, haveHistory); if (resetOwners) resetOwnersByCounter(top, maxCntrs, haveHistory); + + return list; } /** @@ -3470,8 +3473,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param maxCntrs Max counter partiton map. * @param varCntrs Various counters for each partition. * @param haveHistory Set of partitions witch have historical supplier. + * @return List of partitions which does not have historical supplier. */ - private void assignHistoricalSuppliers( + private List<SupplyPartitionInfo> assignHistoricalSuppliers( GridDhtPartitionTopology top, Map<Integer, CounterWithNodes> maxCntrs, Map<Integer, TreeSet<Long>> varCntrs, @@ -3481,6 +3485,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null; + List<SupplyPartitionInfo> list = new ArrayList<>(); + for (Map.Entry<Integer, TreeSet<Long>> e : varCntrs.entrySet()) { int p = e.getKey(); @@ -3529,7 +3535,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte deepestReserved.set(e0.getKey(), histCntr); } } + + // No one reservation matched for this partition. + if (!haveHistory.contains(p)) { + list.add(new SupplyPartitionInfo( + p, + nonMaxCntrs.last(), + deepestReserved.get2(), + deepestReserved.get1() + )); + } } + + return list; } /** @@ -4124,6 +4142,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param resetOwners True if reset partitions state needed, false otherwise. */ private void assignPartitionsStates(boolean resetOwners) { + Map<String, List<SupplyPartitionInfo>> supplyInfoMap = log.isInfoEnabled() ? + new ConcurrentHashMap<>() : null; + try { U.doInParallel( cctx.kernalContext().getSystemExecutorService(), @@ -4135,8 +4156,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ? grpCtx.topology() : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache()); - if (CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration())) - assignPartitionStates(top, resetOwners); + if (CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration())) { + List<SupplyPartitionInfo> list = assignPartitionStates(top, resetOwners); + + if (supplyInfoMap != null && !F.isEmpty(list)) + supplyInfoMap.put(grpDesc.cacheOrGroupName(), list); + } else if (resetOwners) assignPartitionSizes(top); @@ -4148,10 +4173,66 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte throw new IgniteException("Failed to assign partition states", e); } + if (!F.isEmpty(supplyInfoMap)) + printPartitionRebalancingFully(supplyInfoMap); + timeBag.finishGlobalStage("Assign partitions states"); } /** + * Prints detail information about partitions which did not have reservation + * history enough for historical rebalance. + * + * @param supplyInfoMap Map contains information about supplying partitions. + */ + private void printPartitionRebalancingFully(Map<String, List<SupplyPartitionInfo>> supplyInfoMap) { + try { + if (hasPartitionToLog(supplyInfoMap, false)) { + log.info("Partitions weren't present in any history reservation: [" + + supplyInfoMap.entrySet().stream().map(entry -> + "[grp=" + entry.getKey() + " part=[" + S.compact(entry.getValue().stream() + .filter(info -> !info.isHistoryReserved()) + .map(info -> info.part()).collect(Collectors.toSet())) + "]]" + ).collect(Collectors.joining(", ")) + ']'); + } + + if (hasPartitionToLog(supplyInfoMap, true)) { + log.info("Partitions were reserved, but maximum available counter is greater than demanded: [" + + supplyInfoMap.entrySet().stream().map(entry -> + "[grp=" + entry.getKey() + ' ' + + entry.getValue().stream().filter(SupplyPartitionInfo::isHistoryReserved).map(info -> + "[part=" + info.part() + + ", minCntr=" + info.minCntr() + + ", maxReserved=" + info.maxReserved() + + ", maxReservedNodeId=" + info.maxReservedNodeId() + ']' + ).collect(Collectors.joining(", ")) + ']' + ).collect(Collectors.joining(", ")) + ']'); + } + } + catch (Exception e) { + log.error("An error happened during printing partitions that have no history.", e); + } + } + + /** + * Does information contain partitions which will print to log. + * + * @param supplyInfoMap Map contains information about supplying partitions. + * @param reserved Reservation flag. + * @return True if map has partitions with same reserved flag, false otherwise. + */ + private boolean hasPartitionToLog(Map<String, List<SupplyPartitionInfo>> supplyInfoMap, boolean reserved) { + for (List<SupplyPartitionInfo> infos : supplyInfoMap.values()) { + for (SupplyPartitionInfo info : infos) { + if (info.isHistoryReserved() == reserved) + return true; + } + } + + return false; + } + + /** * Removes gaps in the local update counters. Gaps in update counters are possible on backup node when primary * failed to send update counter deltas to backup. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/SupplyPartitionInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/SupplyPartitionInfo.java new file mode 100644 index 0000000..11eca52 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/SupplyPartitionInfo.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Information about supplier for specific partition. + */ +public class SupplyPartitionInfo { + /** + * Number of partiiton. + */ + private int part; + + /** + * Counter of partiotn from demander node. + */ + private long minCntr; + + /** + * Reservation. + */ + private long maxReserved; + + /** + * Node with the moust deepest history by partiton. + */ + private UUID maxReservedNodeId; + + /** + * @param part Partiiotn. + * @param minCntr Minimal counter. + * @param maxReserved Max reservation. + * @param maxReservedNodeId Node with maximum reservation. + */ + public SupplyPartitionInfo(int part, long minCntr, long maxReserved, UUID maxReservedNodeId) { + this.part = part; + this.minCntr = minCntr; + this.maxReserved = maxReserved; + this.maxReservedNodeId = maxReservedNodeId; + } + + /** + * @return Partition. + */ + public int part() { + return part; + } + + /** + * @return Minimum counter. + */ + public long minCntr() { + return minCntr; + } + + /** + * @return Max reservation. + */ + public long maxReserved() { + return maxReserved; + } + + /** + * @return Node id. + */ + public UUID maxReservedNodeId() { + return maxReservedNodeId; + } + + /** + * @return True if reserved, false otherwise. + */ + public boolean isHistoryReserved() { + return maxReserved != Long.MAX_VALUE && maxReservedNodeId != null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SupplyPartitionInfo.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 4b01715..75af035 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -135,6 +135,7 @@ import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkp import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgressImpl; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.PartitionDestroyQueue; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.PartitionDestroyRequest; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.ReservationReason; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; @@ -1737,7 +1738,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan Map</*grpId*/Integer, Set</*partId*/Integer>> applicableGroupsAndPartitions = partitionsApplicableForWalRebalance(); - Map</*grpId*/Integer, Map</*partId*/Integer, CheckpointEntry>> earliestValidCheckpoints; + Map</*grpId*/Integer, T2</*reason*/ReservationReason, Map</*partId*/Integer, CheckpointEntry>>> earliestValidCheckpoints; checkpointReadLock(); @@ -1750,10 +1751,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan Map</*grpId*/Integer, Map</*partId*/Integer, /*updCntr*/Long>> grpPartsWithCnts = new HashMap<>(); - for (Map.Entry<Integer, Map<Integer, CheckpointEntry>> e : earliestValidCheckpoints.entrySet()) { + for (Map.Entry<Integer, T2</*reason*/ReservationReason, Map</*partId*/Integer, CheckpointEntry>>> e : earliestValidCheckpoints.entrySet()) { int grpId = e.getKey(); - for (Map.Entry<Integer, CheckpointEntry> e0 : e.getValue().entrySet()) { + if (e.getValue().get2() == null) + continue; + + for (Map.Entry<Integer, CheckpointEntry> e0 : e.getValue().get2().entrySet()) { CheckpointEntry cpEntry = e0.getValue(); int partId = e0.getKey(); @@ -1772,10 +1776,65 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } + if (log.isInfoEnabled() && !F.isEmpty(earliestValidCheckpoints)) + printReservationToLog(earliestValidCheckpoints); + return grpPartsWithCnts; } /** + * Prints detail information about caches which were not reserved + * and reservation depth for the caches which have WAL history enough. + * + * @param earliestValidCheckpoints Map contains information about caches' reservation. + */ + private void printReservationToLog( + Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> earliestValidCheckpoints) { + try { + Map<ReservationReason, List<Integer>> notReservedCachesToPrint = new HashMap<>(); + Map<ReservationReason, List<T2<Integer, CheckpointEntry>>> reservedCachesToPrint = new HashMap<>(); + + for (Map.Entry<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> entry : earliestValidCheckpoints.entrySet()) { + if (entry.getValue().get2() == null) { + notReservedCachesToPrint.computeIfAbsent(entry.getValue().get1(), reason -> new ArrayList<>()) + .add(entry.getKey()); + } + else { + reservedCachesToPrint.computeIfAbsent(entry.getValue().get1(), reason -> new ArrayList<>()) + .add(new T2(entry.getKey(), entry.getValue().get2().values().stream().min( + Comparator.comparingLong(CheckpointEntry::timestamp)).get())); + } + } + + if (!F.isEmpty(notReservedCachesToPrint)) { + log.info("Cache groups were not reserved [" + + notReservedCachesToPrint.entrySet().stream() + .map(entry -> '[' + + entry.getValue().stream().map(grpId -> "[grpId=" + grpId + + ", grpName=" + cctx.cache().cacheGroup(grpId).cacheOrGroupName() + ']') + .collect(Collectors.joining(", ")) + + ", reason=" + entry.getKey() + ']') + .collect(Collectors.joining(", ")) + ']'); + } + + if (!F.isEmpty(reservedCachesToPrint)) { + log.info("Cache groups with earliest reserved checkpoint and a reason why a previous checkpoint was inapplicable: [" + + reservedCachesToPrint.entrySet().stream() + .map(entry -> '[' + + entry.getValue().stream().map(grpCp -> "[grpId=" + grpCp.get1() + + ", grpName=" + cctx.cache().cacheGroup(grpCp.get1()).cacheOrGroupName() + + ", cp=(" + grpCp.get2().checkpointId() + ", " + U.format(grpCp.get2().timestamp()) + ")]") + .collect(Collectors.joining(", ")) + + ", reason=" + entry.getKey() + ']') + .collect(Collectors.joining(", ")) + ']'); + } + } + catch (Exception e) { + log.error("An error happened during printing partitions that were reserved for potential historical rebalance.", e); + } + } + + /** * @return Map of group id -> Set of partitions which can be used as suppliers for WAL rebalance. */ private Map<Integer, Set<Integer>> partitionsApplicableForWalRebalance() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java index 131e379..dfe3798 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java @@ -550,19 +550,22 @@ public class CheckpointHistory { * * @param groupsAndPartitions Groups and partitions to find and reserve earliest valid checkpoint. * - * @return Map (groupId: Map (partitionId, earliest valid checkpoint to history search)). + * @return Map (groupId, Reason (the reason why reservation cannot be made deeper): Map + * (partitionId, earliest valid checkpoint to history search)). */ - public Map<Integer, Map<Integer, CheckpointEntry>> searchAndReserveCheckpoints( + public Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> searchAndReserveCheckpoints( final Map<Integer, Set<Integer>> groupsAndPartitions ) { if (F.isEmpty(groupsAndPartitions)) return Collections.emptyMap(); - final Map<Integer, Map<Integer, CheckpointEntry>> res = new HashMap<>(); + final Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> res = new HashMap<>(); CheckpointEntry oldestCpForReservation = null; synchronized (earliestCp) { + CheckpointEntry oldestHistoryCpEntry = firstCheckpoint(); + for (Integer grpId : groupsAndPartitions.keySet()) { CheckpointEntry oldestGrpCpEntry = null; @@ -579,17 +582,23 @@ public class CheckpointHistory { oldestGrpCpEntry = cpEntry; res.computeIfAbsent(grpId, partCpMap -> - new HashMap<>()) - .put(part, cpEntry); + new T2<>(ReservationReason.NO_MORE_HISTORY, new HashMap<>())) + .get2().put(part, cpEntry); } + + if (oldestGrpCpEntry == null || oldestGrpCpEntry != oldestHistoryCpEntry) + res.computeIfAbsent(grpId, (partCpMap) -> + new T2<>(ReservationReason.CHECKPOINT_NOT_APPLICABLE, null)) + .set1(ReservationReason.CHECKPOINT_NOT_APPLICABLE); } + } - if (oldestCpForReservation != null) { - if (!cctx.wal().reserve(oldestCpForReservation.checkpointMark())) { - log.warning("Could not reserve cp " + oldestCpForReservation.checkpointMark()); + if (oldestCpForReservation != null) { + if (!cctx.wal().reserve(oldestCpForReservation.checkpointMark())) { + log.warning("Could not reserve cp " + oldestCpForReservation.checkpointMark()); - return Collections.emptyMap(); - } + for (Map.Entry<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> entry : res.entrySet()) + entry.setValue(new T2<>(ReservationReason.WAL_RESERVATION_ERROR, null)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/ReservationReason.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/ReservationReason.java new file mode 100644 index 0000000..5f12e45 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/ReservationReason.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.checkpoint; + +/** + * Represent a reason by that a WAL history was bounded. + */ +public enum ReservationReason { + /** The message puts down to log when an exception happened during + * reading a WAL segment or a segment cannot be reserved. */ + WAL_RESERVATION_ERROR, + + /** Reason means no more history reserved for the cache. */ + NO_MORE_HISTORY, + + /** Reason means a checkpoint in history reserved can not be applied for cache. */ + CHECKPOINT_NOT_APPLICABLE; + + /** {@inheritDoc} */ + @Override public String toString() { + switch (this) { + case WAL_RESERVATION_ERROR: + return "Unexpected error during processing of previous checkpoint"; + + case NO_MORE_HISTORY: + return "Reserved checkpoint is the oldest in history"; + + case CHECKPOINT_NOT_APPLICABLE: + return "Checkpoint was marked as inapplicable for historical rebalancing"; + + default: + throw new IllegalArgumentException(); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java new file mode 100644 index 0000000..9b21b49 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; +import static org.apache.ignite.internal.pagemem.wal.record.RolloverType.CURRENT_SEGMENT; + +/** + * Tests for checking historical rebalance log messages. + */ +public class IgniteWalRebalanceLoggingTest extends GridCommonAbstractTest { + /** This timeout should be big enough in order to prohibit checkpoint triggered by timeout. */ + private static final int CHECKPOINT_FREQUENCY = 600_000; + + /** Test logger. */ + private final ListeningTestLogger srvLog = new ListeningTestLogger(false, log); + + /** */ + private static final int KEYS_LOW_BORDER = 100; + + /** */ + private static final int KEYS_UPPER_BORDER = 200; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setGridLogger(srvLog); + + DataStorageConfiguration storageCfg = new DataStorageConfiguration(); + + storageCfg.getDefaultDataRegionConfiguration() + .setMaxSize(200L * 1024 * 1024) + .setPersistenceEnabled(true); + + storageCfg.setWalMode(WALMode.LOG_ONLY) + .setMaxWalArchiveSize(-1) + .setWalCompactionEnabled(true) + .setWalCompactionLevel(1); + + storageCfg.setCheckpointFrequency(CHECKPOINT_FREQUENCY); + + cfg.setDataStorageConfiguration(storageCfg); + + return cfg; + } + + /** {@inheritDoc}*/ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc}*/ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * Check that in case of Historical rebalance we log appropriate messages. + * <p> + * <b>Steps:</b> + * <ol> + * <li>set IGNITE_PDS_WAL_REBALANCE_THRESHOLD to 1</li> + * <li>Start two nodes.</li> + * <li>Create two caches each in it's own cache group and populate them with some data.</li> + * <li>Stop second node and add more data to both caches.</li> + * <li>Wait checkpoint frequency * 2. This is required to guarantee that at least one checkpoint would be + * created.</li> + * <li>Start, previously stopped node and await for PME.</li> + * </ol> + * <p> + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "1") + public void testHistoricalRebalanceLogMsg() throws Exception { + LogListener expMsgsLsnr = LogListener.matches(str -> + str.startsWith("Cache groups with earliest reserved checkpoint and a reason why a previous checkpoint was inapplicable:") && + str.contains("cache_group1") && str.contains("cache_group2")).times(3). + andMatches(str -> str.startsWith("Starting rebalance routine") && + (str.contains("cache_group1") || str.contains("cache_group2")) && + str.contains("fullPartitions=[], histPartitions=[0-7]")).times(2).build(); + + LogListener unexpectedMessagesLsnr = LogListener.matches((str) -> + str.startsWith("Partitions weren't present in any history reservation:") || + str.startsWith("Partitions were reserved, but maximum available counter is greater than demanded:") + ).build(); + + checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr, unexpectedMessagesLsnr); + + assertTrue(expMsgsLsnr.check()); + assertFalse(unexpectedMessagesLsnr.check()); + } + + /** + * Check that in case of Full rebalance we log appropriate messages. + * <p> + * <b>Steps:</b> + * <ol> + * <li>restore IGNITE_PDS_WAL_REBALANCE_THRESHOLD to default 500000</li> + * <li>Start two nodes.</li> + * <li>Create two caches each in it's own cache group and populate them with some data.</li> + * <li>Stop second node and add more data to both caches.</li> + * <li>Wait checkpoint frequency * 2. This is required to guarantee that at least one checkpoint would be + * created.</li> + * <li>Start, previously stopped node and await for PME.</li> + * </ol> + * <p> + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "500000") + public void testFullRebalanceLogMsgs() throws Exception { + LogListener expMsgsLsnr = LogListener. + matches("Partitions weren't present in any history reservation: " + + "[[grp=cache_group2 part=[[0-7]]], [grp=cache_group1 part=[[0-7]]]]"). + andMatches(str -> str.startsWith("Starting rebalance routine") && + (str.contains("cache_group1") || str.contains("cache_group2")) && + str.contains("fullPartitions=[0-7], histPartitions=[]")).times(2).build(); + + checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr); + + assertTrue(expMsgsLsnr.check()); + } + + /** + * Test checks log messages in case of short history of checkpoint. + * + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, value = "2") + @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "1") + public void testFullRebalanceWithShortCpHistoryLogMsgs() throws Exception { + LogListener expMsgsLsnr = LogListener. + matches(str -> str.startsWith("Partitions were reserved, but maximum available counter is greater than demanded: ") && + str.contains("grp=cache_group1") && str.contains("grp=cache_group2")). + andMatches(str -> str.startsWith("Starting rebalance routine") && + (str.contains("cache_group1") || str.contains("cache_group2")) && + str.contains("fullPartitions=[0-7], histPartitions=[]")).times(2).build(); + + checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr); + + assertTrue(expMsgsLsnr.check()); + } + + /** + * Test utility method. + * + * @param lsnrs Listeners to register with server logger. + * @throws Exception If failed. + */ + private void checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(LogListener... lsnrs) + throws Exception { + startGridsMultiThreaded(2).cluster().active(true); + + IgniteCache<Integer, String> cache1 = createCache("cache1", "cache_group1"); + IgniteCache<Integer, String> cache2 = createCache("cache2", "cache_group2"); + + for (int i = 0; i < KEYS_LOW_BORDER; i++) { + cache1.put(i, "abc" + i); + cache2.put(i, "abc" + i); + + if (i % 20 == 0) + forceCheckpointAndRollOwerWal(); + } + + stopGrid(1); + + for (int i = KEYS_LOW_BORDER; i < KEYS_UPPER_BORDER; i++) { + cache1.put(i, "abc" + i); + cache2.put(i, "abc" + i); + + if (i % 20 == 0) + forceCheckpointAndRollOwerWal(); + } + + srvLog.clearListeners(); + + for (LogListener lsnr: lsnrs) + srvLog.registerListener(lsnr); + + startGrid(1); + + awaitPartitionMapExchange(false, true, null); + } + + /** + * Create cache with specific name and group name. + * @param cacheName Cache name. + * @param cacheGrpName Cache group name. + * @return Created cache. + */ + private IgniteCache<Integer, String> createCache(String cacheName, String cacheGrpName) { + return ignite(0).createCache( + new CacheConfiguration<Integer, String>(cacheName). + setAffinity(new RendezvousAffinityFunction().setPartitions(8)) + .setGroupName(cacheGrpName). + setBackups(1)); + } + + /** + * Invokes checkpoint forcibly and rollovers WAL segment. + * It might be need for simulate long checkpoint history in test. + * + * @throws Exception If failed. + */ + private void forceCheckpointAndRollOwerWal() throws Exception { + forceCheckpoint(); + + for (Ignite ignite : G.allGrids()) { + if (ignite.cluster().localNode().isClient()) + continue; + + IgniteEx ig = (IgniteEx)ignite; + + IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal(); + + ig.context().cache().context().database().checkpointReadLock(); + + try { + WALPointer ptr = walMgr.log(new AdHocWALRecord(), CURRENT_SEGMENT); + } + finally { + ig.context().cache().context().database().checkpointReadUnlock(); + } + } + } + + /** Tets WAL record. */ + private static class AdHocWALRecord extends CheckpointRecord { + /** Default constructor. */ + private AdHocWALRecord() { + super(null); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java index 38f93c6..a17e889 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWALT import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFormatFileFailoverTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorExceptionDuringReadTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceLoggingTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest; @@ -93,6 +94,7 @@ public class IgnitePdsMvccTestSuite2 { ignoredTests.add(StandaloneWalRecordsIteratorTest.class); ignoredTests.add(IgniteWALTailIsReachedDuringIterationOverArchiveTest.class); ignoredTests.add(WalRolloverTypesTest.class); + ignoredTests.add(IgniteWalRebalanceLoggingTest.class); ignoredTests.add(FsyncWalRolloverDoesNotBlockTest.class); return IgnitePdsTestSuite2.suite(ignoredTests); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index b5a314f..2f065c4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorExceptionDuringReadTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceLoggingTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalReplayingAfterRestartTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest; @@ -259,5 +260,7 @@ public class IgnitePdsTestSuite2 { GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlDeactivateOnHighloadTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, WalPreloadingConcurrentTest.class, ignoredTests); + + GridTestUtils.addTestIfNeeded(suite, IgniteWalRebalanceLoggingTest.class, ignoredTests); } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java index 2e63fdb..57021aa 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java @@ -46,6 +46,8 @@ public class IgniteDataStreamerTest extends GridCommonAbstractTest { startGrids(2); startClientGrid("client"); + + awaitPartitionMapExchange(); } @Override protected void afterTest() throws Exception {