This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 4b58545534a IGNITE-16582 Improve behavior of speed-based throttling when dirty pages ratio is low (#9924) 4b58545534a is described below commit 4b58545534a49a39b8fd60bb560d1bec4ae235fc Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Sat Jun 4 12:45:24 2022 +0400 IGNITE-16582 Improve behavior of speed-based throttling when dirty pages ratio is low (#9924) --- .../checkpoint/CheckpointProgressImpl.java | 6 +- .../pagemem/PagesWriteSpeedBasedThrottle.java | 8 +- ...edBasedMemoryConsumptionThrottlingStrategy.java | 170 ++++++++++----------- ...va => AbstractSlowCheckpointFileIOFactory.java} | 17 ++- .../db/CheckpointBufferDeadlockTest.java | 2 +- .../db/SlowCheckpointMetadataFileIOFactory.java | 41 +++++ .../db/SlowCheckpointPagesFileIOFactory.java | 41 +++++ .../pagemem/IgniteThrottlingUnitTest.java | 52 ++++--- .../pagemem/PagesWriteThrottleSandboxTest.java | 69 +++++++-- .../pagemem/PagesWriteThrottleSmokeTest.java | 4 +- .../pagemem/SpeedBasedThrottleIntegrationTest.java | 114 ++++++++++++++ .../performancestatistics/CheckpointTest.java | 4 +- .../ignite/testsuites/IgnitePdsTestSuite5.java | 2 + 13 files changed, 390 insertions(+), 140 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java index 5c78bccc94e..1f0e7cebd47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java @@ -246,11 +246,11 @@ public class CheckpointProgressImpl implements CheckpointProgress { } /** {@inheritDoc} */ - @Override public void updateEvictedPages(int deltha) { - A.ensure(deltha > 0, "param must be positive"); + @Override public void updateEvictedPages(int delta) { + A.ensure(delta > 0, "param must be positive"); if (evictedPagesCounter() != null) - evictedPagesCounter().addAndGet(deltha); + evictedPagesCounter().addAndGet(delta); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java index 276b448e454..f9dae4a5854 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkp import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteOutClosure; +import org.jetbrains.annotations.TestOnly; /** * Throttles threads that generate dirty pages during ongoing checkpoint. @@ -104,7 +105,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { this.log = log; cleanPagesProtector = new SpeedBasedMemoryConsumptionThrottlingStrategy(pageMemory, cpProgress, - markSpeedAndAvgParkTime); + markSpeedAndAvgParkTime); cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory); } @@ -186,7 +187,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { if (prevWarnTime.compareAndSet(prevWarningNs, curNs) && log.isInfoEnabled()) { String msg = String.format("Throttling is applied to page modifications " + - "[percentOfPartTime=%.2f, markDirty=%d pages/sec, checkpointWrite=%d pages/sec, " + + "[fractionOfParkTime=%.2f, markDirty=%d pages/sec, checkpointWrite=%d pages/sec, " + "estIdealMarkDirty=%d pages/sec, curDirty=%.2f, maxDirty=%.2f, avgParkTime=%d ns, " + "pages: (total=%d, evicted=%d, written=%d, synced=%d, cpBufUsed=%d, cpBufTotal=%d)]", weight, getMarkDirtySpeed(), getCpWriteSpeed(), @@ -210,6 +211,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { * @param curCpWriteSpeed average checkpoint write speed, pages/sec. * @return time in nanoseconds to part or 0 if throttling is not required. */ + @TestOnly long getCleanPagesProtectionParkTime( double dirtyPagesRatio, long fullyCompletedPages, @@ -293,7 +295,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { if (speed <= 0) return 0; - long timeForOnePage = cleanPagesProtector.calcDelayTime(speed); + long timeForOnePage = cleanPagesProtector.nsPerOperation(speed); if (timeForOnePage == 0) return 0; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java index 2a5aba62511..9fe7a90eb90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java @@ -57,7 +57,7 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { /** * Total pages possible to store in page memory. */ - private final long totalPages; + private volatile long pageMemTotalPages; /** * Last estimated speed for marking all clear pages as dirty till the end of checkpoint. @@ -111,8 +111,6 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { this.pageMemory = pageMemory; this.cpProgress = cpProgress; this.markSpeedAndAvgParkTime = markSpeedAndAvgParkTime; - - totalPages = pageMemory.totalPages(); } /** @@ -149,7 +147,7 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { final int cpWrittenPages = writtenPagesCounter.get(); final long donePages = cpDonePagesEstimation(cpWrittenPages); - final long markDirtySpeed = markSpeedAndAvgParkTime.getSpeedOpsPerSec(curNanoTime); + final long instantaneousMarkDirtySpeed = markSpeedAndAvgParkTime.getSpeedOpsPerSec(curNanoTime); // NB: we update progress for speed calculation only in this (clean pages protection) scenario, because // we only use the computed speed in this same scenario and for reporting in logs (where it's not super // important to display an ideally accurate speed), but not in the CP Buffer protection scenario. @@ -157,7 +155,8 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { // The progress is set to 0 at the beginning of a checkpoint, so we can be sure that the start time remembered // in cpWriteSpeed is pretty accurate even without writing to cpWriteSpeed from this method. cpWriteSpeed.setProgress(donePages, curNanoTime); - final long curCpWriteSpeed = cpWriteSpeed.getOpsPerSecond(curNanoTime); + // TODO: IGNITE-16878 use exponential moving average so that we react to changes faster? + final long avgCpWriteSpeed = cpWriteSpeed.getOpsPerSecond(curNanoTime); final int cpTotalPages = cpTotalPages(); @@ -166,11 +165,11 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { // CheckpointProgressImpl.clearCounters() is invoked at the end of a checkpoint (by falling through // between two volatile assignments). When we get here, we don't have any information about the total // number of pages in the current CP, so we calculate park time by only using information we have. - return parkTimeToThrottleByJustCPSpeed(markDirtySpeed, curCpWriteSpeed); + return parkTimeToThrottleByJustCPSpeed(instantaneousMarkDirtySpeed, avgCpWriteSpeed); } else { - return speedBasedParkTime(cpWrittenPages, donePages, markDirtySpeed, - curCpWriteSpeed, cpTotalPages); + return speedBasedParkTime(cpWrittenPages, donePages, cpTotalPages, instantaneousMarkDirtySpeed, + avgCpWriteSpeed); } } @@ -183,6 +182,9 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { * @return estimation of work done (in pages) */ private int cpDonePagesEstimation(int cpWrittenPages) { + // TODO: IGNITE-16879 - this only works correctly if time-to-write a page is close to time-to-sync a page. + // In reality, this does not seem to hold, which produces wrong estimations. We could measure the real times + // in Checkpointer and make this estimation a lot more precise. return (cpWrittenPages + cpSyncedPages()) / 2; } @@ -198,15 +200,15 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { boolean throttleByCpSpeed = curCpWriteSpeed > 0 && markDirtySpeed > curCpWriteSpeed; if (throttleByCpSpeed) { - return calcDelayTime(curCpWriteSpeed); + return nsPerOperation(curCpWriteSpeed); } return 0; } /***/ - private long speedBasedParkTime(int cpWrittenPages, long donePages, long markDirtySpeed, - long curCpWriteSpeed, int cpTotalPages) { + private long speedBasedParkTime(int cpWrittenPages, long donePages, int cpTotalPages, + long instantaneousMarkDirtySpeed, long avgCpWriteSpeed) { final double dirtyPagesRatio = pageMemory.getDirtyPagesRatio(); currDirtyRatio = dirtyPagesRatio; @@ -220,8 +222,8 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { donePages, notEvictedPagesTotal(cpTotalPages), threadIdsCount(), - markDirtySpeed, - curCpWriteSpeed); + instantaneousMarkDirtySpeed, + avgCpWriteSpeed); } } @@ -237,8 +239,8 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { * @param donePages roughly, written & fsynced pages count. * @param cpTotalPages total checkpoint scope. * @param nThreads number of threads providing data during current checkpoint. - * @param markDirtySpeed registered mark dirty speed, pages/sec. - * @param curCpWriteSpeed average checkpoint write speed, pages/sec. + * @param instantaneousMarkDirtySpeed registered (during approx last second) mark dirty speed, pages/sec. + * @param avgCpWriteSpeed average checkpoint write speed, pages/sec. * @return time in nanoseconds to part or 0 if throttling is not required. */ long getParkTime( @@ -246,84 +248,40 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { long donePages, int cpTotalPages, int nThreads, - long markDirtySpeed, - long curCpWriteSpeed) { + long instantaneousMarkDirtySpeed, + long avgCpWriteSpeed) { final long targetSpeedToMarkAll = calcSpeedToMarkAllSpaceTillEndOfCp(dirtyPagesRatio, donePages, - curCpWriteSpeed, cpTotalPages); + avgCpWriteSpeed, cpTotalPages); final double targetCurrentDirtyRatio = targetCurrentDirtyRatio(donePages, cpTotalPages); - updateSpeedAndRatio(targetSpeedToMarkAll, targetCurrentDirtyRatio); - - long delayByCpWrite = delayIfMarkingFasterThanCPWriteSpeedAllows(markDirtySpeed, curCpWriteSpeed, - dirtyPagesRatio, nThreads, targetSpeedToMarkAll, targetCurrentDirtyRatio); - long delayByMarkAllWrite = delayIfMarkingFasterThanTargetSpeedAllows(markDirtySpeed, dirtyPagesRatio, nThreads, - targetSpeedToMarkAll, targetCurrentDirtyRatio); - - return Math.max(delayByCpWrite, delayByMarkAllWrite); - } - - /***/ - private long delayIfMarkingFasterThanCPWriteSpeedAllows(long markDirtySpeed, long curCpWriteSpeed, - double dirtyPagesRatio, int nThreads, - long targetSpeedToMarkAll, double targetCurrentDirtyRatio) { - final double allowedCpWriteSpeedExcessMultiplier = allowedCpWriteSpeedExcessMultiplier(markDirtySpeed, - dirtyPagesRatio, targetSpeedToMarkAll, targetCurrentDirtyRatio); - final boolean throttleByCpSpeed = curCpWriteSpeed > 0 - && markDirtySpeed > (allowedCpWriteSpeedExcessMultiplier * curCpWriteSpeed); - - if (!throttleByCpSpeed) { - return 0; - } - - int slowdown = slowdownIfLowSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio); - long nanosecsToMarkOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / markDirtySpeed; - long nanosecsToWriteOneCPPage = calcDelayTime(curCpWriteSpeed, nThreads, slowdown); - return nanosecsToWriteOneCPPage - nanosecsToMarkOnePage; - } - - /***/ - private double allowedCpWriteSpeedExcessMultiplier(long markDirtySpeed, double dirtyPagesRatio, - long targetSpeedToMarkAll, double targetCurrentDirtyRatio) { - final boolean lowSpaceLeft = lowCleanSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio); - - // for case of speedForMarkAll >> markDirtySpeed, allow write little bit faster than CP average - final double allowWriteFasterThanCp; - if (markDirtySpeed > 0 && markDirtySpeed < targetSpeedToMarkAll) - allowWriteFasterThanCp = 0.1 * targetSpeedToMarkAll / markDirtySpeed; - else if (dirtyPagesRatio > targetCurrentDirtyRatio) - allowWriteFasterThanCp = 0.0; - else - allowWriteFasterThanCp = 0.1; - - return lowSpaceLeft - ? 1.0 - : 1.0 + allowWriteFasterThanCp; - } - - /***/ - private int slowdownIfLowSpaceLeft(double dirtyPagesRatio, double targetCurrentDirtyRatio) { - boolean lowSpaceLeft = lowCleanSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio); - return slowdownIfLowSpaceLeft(lowSpaceLeft); - } + publishSpeedAndRatioForMetrics(targetSpeedToMarkAll, targetCurrentDirtyRatio); - /***/ - private int slowdownIfLowSpaceLeft(boolean lowSpaceLeft) { - return lowSpaceLeft ? 3 : 1; + return delayIfMarkingFasterThanTargetSpeedAllows(instantaneousMarkDirtySpeed, + dirtyPagesRatio, nThreads, targetSpeedToMarkAll, targetCurrentDirtyRatio); } /***/ - private long delayIfMarkingFasterThanTargetSpeedAllows(long markDirtySpeed, double dirtyPagesRatio, int nThreads, + private long delayIfMarkingFasterThanTargetSpeedAllows(long instantaneousMarkDirtySpeed, double dirtyPagesRatio, + int nThreads, long targetSpeedToMarkAll, double targetCurrentDirtyRatio) { final boolean lowSpaceLeft = lowCleanSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio); final int slowdown = slowdownIfLowSpaceLeft(lowSpaceLeft); double multiplierForSpeedToMarkAll = lowSpaceLeft ? 0.8 : 1.0; boolean markingTooFastNow = targetSpeedToMarkAll > 0 - && markDirtySpeed > multiplierForSpeedToMarkAll * targetSpeedToMarkAll; + && instantaneousMarkDirtySpeed > multiplierForSpeedToMarkAll * targetSpeedToMarkAll; boolean markedTooFastSinceCPStart = dirtyPagesRatio > targetCurrentDirtyRatio; boolean markingTooFast = markedTooFastSinceCPStart && markingTooFastNow; - return markingTooFast ? calcDelayTime(targetSpeedToMarkAll, nThreads, slowdown) : 0; + + // We must NOT subtract nsPerOperation(instantaneousMarkDirtySpeed, nThreads)! If we do, the actual speed + // converges to a value that is 1-2 times higher than the target speed. + return markingTooFast ? nsPerOperation(targetSpeedToMarkAll, nThreads, slowdown) : 0; + } + + /***/ + private int slowdownIfLowSpaceLeft(boolean lowSpaceLeft) { + return lowSpaceLeft ? 3 : 1; } /** @@ -338,9 +296,9 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { } /***/ - private void updateSpeedAndRatio(long speedForMarkAll, double targetDirtyRatio) { - this.speedForMarkAll = speedForMarkAll; //publish for metrics - this.targetDirtyRatio = targetDirtyRatio; //publish for metrics + private void publishSpeedAndRatioForMetrics(long speedForMarkAll, double targetDirtyRatio) { + this.speedForMarkAll = speedForMarkAll; + this.targetDirtyRatio = targetDirtyRatio; } /** @@ -351,14 +309,14 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { * * @param dirtyPagesRatio current percent of dirty pages. * @param donePages roughly, count of written and sync'ed pages - * @param curCpWriteSpeed pages/second checkpoint write speed. 0 speed means 'no data'. + * @param avgCpWriteSpeed pages/second checkpoint write speed. 0 speed means 'no data'. * @param cpTotalPages total pages in checkpoint. * @return pages/second to mark to mark all clean pages as dirty till the end of checkpoint. 0 speed means 'no * data', or when we are not going to throttle due to the current dirty pages ratio being too high */ private long calcSpeedToMarkAllSpaceTillEndOfCp(double dirtyPagesRatio, long donePages, - long curCpWriteSpeed, int cpTotalPages) { - if (curCpWriteSpeed == 0) + long avgCpWriteSpeed, int cpTotalPages) { + if (avgCpWriteSpeed == 0) return 0; if (cpTotalPages <= 0) @@ -367,11 +325,30 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { if (dirtyPagesRatio >= MAX_DIRTY_PAGES) return 0; - double remainedClearPages = (MAX_DIRTY_PAGES - dirtyPagesRatio) * totalPages; + // IDEA: here, when calculating the count of clean pages, it includes the pages under checkpoint. It is kinda + // legal because they can be written (using the Checkpoint Buffer to make a copy of the value to be + // checkpointed), but the CP Buffer is usually not too big, and if it gets nearly filled, writes become + // throttled really hard by exponential throttler. Maybe we should subtract the number of not-yet-written-by-CP + // pages from the count of clean pages? In such a case, we would lessen the risk of CP Buffer-caused throttling. + double remainedCleanPages = (MAX_DIRTY_PAGES - dirtyPagesRatio) * pageMemTotalPages(); + + double secondsTillCPEnd = 1.0 * (cpTotalPages - donePages) / avgCpWriteSpeed; + + return (long)(remainedCleanPages / secondsTillCPEnd); + } + + /** Returns total number of pages storable in page memory. */ + private long pageMemTotalPages() { + long currentTotalPages = pageMemTotalPages; + + if (currentTotalPages == 0) { + currentTotalPages = pageMemory.totalPages(); + pageMemTotalPages = currentTotalPages; + } - double secondsTillCPEnd = 1.0 * (cpTotalPages - donePages) / curCpWriteSpeed; + assert currentTotalPages > 0 : "PageMemory.totalPages() is still 0"; - return (long)(remainedClearPages / secondsTillCPEnd); + return currentTotalPages; } /** @@ -457,24 +434,33 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy { * @param baseSpeed speed to slow down. * @return sleep time in nanoseconds. */ - long calcDelayTime(long baseSpeed) { - return calcDelayTime(baseSpeed, threadIdsCount(), 1); + long nsPerOperation(long baseSpeed) { + return nsPerOperation(baseSpeed, threadIdsCount()); } /** - * @param baseSpeed speed to slow down. + * @param speedPagesPerSec speed to slow down. + * @param nThreads operating threads. + * @return sleep time in nanoseconds. + */ + private long nsPerOperation(long speedPagesPerSec, int nThreads) { + return nsPerOperation(speedPagesPerSec, nThreads, 1); + } + + /** + * @param speedPagesPerSec speed to slow down. * @param nThreads operating threads. * @param factor how much it is needed to slowdown base speed. 1 means delay to get exact base speed. * @return sleep time in nanoseconds. */ - private long calcDelayTime(long baseSpeed, int nThreads, int factor) { + private long nsPerOperation(long speedPagesPerSec, int nThreads, int factor) { if (factor <= 0) throw new IllegalStateException("Coefficient should be positive"); - if (baseSpeed <= 0) + if (speedPagesPerSec <= 0) return 0; - long updTimeNsForOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / (baseSpeed); + long updTimeNsForOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / (speedPagesPerSec); return factor * updTimeNsForOnePage; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointFileIOFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/AbstractSlowCheckpointFileIOFactory.java similarity index 83% rename from modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointFileIOFactory.java rename to modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/AbstractSlowCheckpointFileIOFactory.java index cad6b568f77..1a6f0f7b2ff 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointFileIOFactory.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/AbstractSlowCheckpointFileIOFactory.java @@ -29,9 +29,9 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactor import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; /** - * Create File I/O that emulates poor checkpoint write speed. + * File I/O that emulates poor checkpoint write speed. */ -public class SlowCheckpointFileIOFactory implements FileIOFactory { +public abstract class AbstractSlowCheckpointFileIOFactory implements FileIOFactory { /** Serial version uid. */ private static final long serialVersionUID = 0L; @@ -42,13 +42,13 @@ public class SlowCheckpointFileIOFactory implements FileIOFactory { private final AtomicBoolean slowCheckpointEnabled; /** Checkpoint park nanos. */ - private final int checkpointParkNanos; + private final long checkpointParkNanos; /** * @param slowCheckpointEnabled Slow checkpoint enabled. * @param checkpointParkNanos Checkpoint park nanos. */ - public SlowCheckpointFileIOFactory(AtomicBoolean slowCheckpointEnabled, int checkpointParkNanos) { + protected AbstractSlowCheckpointFileIOFactory(AtomicBoolean slowCheckpointEnabled, long checkpointParkNanos) { this.slowCheckpointEnabled = slowCheckpointEnabled; this.checkpointParkNanos = checkpointParkNanos; } @@ -78,9 +78,16 @@ public class SlowCheckpointFileIOFactory implements FileIOFactory { /** Parks current checkpoint thread if slow mode is enabled. */ private void parkIfNeeded() { - if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("db-checkpoint-thread")) + if (slowCheckpointEnabled.get() && shouldSlowDownCurrentThread()) LockSupport.parkNanos(checkpointParkNanos); } }; } + + /** + * Returns {@code true} if the current thread should be slowed down. + * + * @return {@code true} if the current thread should be slowed down + */ + protected abstract boolean shouldSlowDownCurrentThread(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java index c573c6c0030..7218be92f65 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java @@ -88,7 +88,7 @@ public class CheckpointBufferDeadlockTest extends GridCommonAbstractTest { cfg.setDataStorageConfiguration( new DataStorageConfiguration() - .setFileIOFactory(new SlowCheckpointFileIOFactory(slowCheckpointEnabled, CHECKPOINT_PARK_NANOS)) + .setFileIOFactory(new SlowCheckpointMetadataFileIOFactory(slowCheckpointEnabled, CHECKPOINT_PARK_NANOS)) .setCheckpointThreads(checkpointThreads) .setDefaultDataRegionConfiguration( new DataRegionConfiguration() diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointMetadataFileIOFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointMetadataFileIOFactory.java new file mode 100644 index 00000000000..caea7cc4ad0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointMetadataFileIOFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * File I/O that emulates poor checkpoint metadata write speed. + */ +public class SlowCheckpointMetadataFileIOFactory extends AbstractSlowCheckpointFileIOFactory { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** + * @param slowCheckpointEnabled Slow checkpoint enabled. + * @param checkpointParkNanos Checkpoint park nanos. + */ + public SlowCheckpointMetadataFileIOFactory(AtomicBoolean slowCheckpointEnabled, long checkpointParkNanos) { + super(slowCheckpointEnabled, checkpointParkNanos); + } + + /** {@inheritDoc} */ + @Override protected boolean shouldSlowDownCurrentThread() { + return Thread.currentThread().getName().contains("db-checkpoint-thread"); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointPagesFileIOFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointPagesFileIOFactory.java new file mode 100644 index 00000000000..6a3ac5eed30 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointPagesFileIOFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * File I/O that emulates poor checkpoint pages write speed. + */ +public class SlowCheckpointPagesFileIOFactory extends AbstractSlowCheckpointFileIOFactory { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** + * @param slowCheckpointEnabled Slow checkpoint enabled. + * @param checkpointParkNanos Checkpoint park nanos. + */ + public SlowCheckpointPagesFileIOFactory(AtomicBoolean slowCheckpointEnabled, long checkpointParkNanos) { + super(slowCheckpointEnabled, checkpointParkNanos); + } + + /** {@inheritDoc} */ + @Override protected boolean shouldSlowDownCurrentThread() { + return Thread.currentThread().getName().contains("checkpoint-runner-"); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java index 065d9921f68..57b07d633f9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java @@ -67,13 +67,13 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest { public Timeout globalTimeout = Timeout.millis((int)GridTestUtils.DFLT_TEST_TIMEOUT); /** Logger. */ - private IgniteLogger log = new NullLogger(); + private final IgniteLogger log = new NullLogger(); /** Page memory 2 g. */ - private PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class); + private final PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class); /** State checker. */ - private CheckpointLockStateChecker stateChecker = () -> true; + private final CheckpointLockStateChecker stateChecker = () -> true; /** {@link CheckpointProgress} mock. */ private final CheckpointProgress progress = mock(CheckpointProgress.class); @@ -97,37 +97,56 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest { } /** - * + * Tests that the speed-based throttler throttles when writing faster than target speed, AND the dirty ratio + * is above the target ratio. */ @Test - public void breakInCaseTooFast() { + public void shouldThrottleWhenWritingTooFast() { PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log); - long time = throttle.getCleanPagesProtectionParkTime(0.67, + long parkTime = throttle.getCleanPagesProtectionParkTime(0.67, (362584 + 67064) / 2, 328787, 1, 60184, 23103); - assertTrue(time > 0); + assertTrue(parkTime > 0); } /** * */ @Test - public void noBreakIfNotFastWrite() { + public void shouldNotThrottleWhenWritingSlowly() { PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log); - long time = throttle.getCleanPagesProtectionParkTime(0.47, + long parkTime = throttle.getCleanPagesProtectionParkTime(0.47, ((362584 + 67064) / 2), 328787, 1, 20103, 23103); - assertEquals(0, time); + assertEquals(0, parkTime); + } + + /** + * Tests that the speed-based throttler does NOT throttle when there are plenty clean pages, even if writing + * faster than the current checkpoint speed. + */ + @Test + public void shouldNotThrottleWhenThereArePlentyCleanPages() { + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log); + + long parkTime = throttle.getCleanPagesProtectionParkTime(0.0, + (362584 + 67064) / 2, + 328787, + 1, + 60184, + 23103); + + assertEquals(0, parkTime); } /** @@ -140,17 +159,14 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest { int markDirtySpeed = 34422; int cpWriteSpeed = 19416; - long time = throttle.getCleanPagesProtectionParkTime(0.04, + long time = throttle.getCleanPagesProtectionParkTime(0.67, ((903150 + 227217) / 2), 903150, 1, markDirtySpeed, cpWriteSpeed); - long mdSpeed = TimeUnit.SECONDS.toNanos(1) / markDirtySpeed; - long cpSpeed = TimeUnit.SECONDS.toNanos(1) / cpWriteSpeed; - - assertEquals((cpSpeed - mdSpeed), time); + assertEquals(415110, time); } /** @@ -272,7 +288,7 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest { * */ @Test - public void tooMuchPagesMarkedDirty() { + public void doNotThrottleWhenDirtyPagesRatioIsTooHigh() { PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log); // 363308 350004 348976 10604 @@ -283,8 +299,6 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest { 279, 23933); - System.err.println(time); - assertEquals(0, time); } @@ -453,8 +467,6 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest { break; } - System.out.println(throttle.throttleWeight()); - assertTrue(warnings.get() > 0); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java index 3814bddab8f..9a890c54762 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java @@ -19,8 +19,11 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; import java.io.Serializable; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import com.google.common.util.concurrent.AtomicDouble; import org.apache.ignite.DataRegionMetrics; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; @@ -33,7 +36,10 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.failure.StopNodeOrHaltFailureHandler; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkpointer; import org.apache.ignite.internal.processors.metric.impl.HitRateMetric; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -55,7 +61,7 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest { DataStorageConfiguration dbCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() - .setMaxSize(4000L * 1024 * 1024) + .setMaxSize(1000L * 1024 * 1024) .setCheckpointPageBufferSize(1000L * 1000 * 1000) .setName("dfltDataRegion") .setMetricsEnabled(true) @@ -132,6 +138,9 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest { }, 2, "read-loader"); final HitRateMetric putRate = new HitRateMetric("putRate", "", 1000, 5); + final AtomicLong putCount = new AtomicLong(); + final AtomicDouble maxDirtyRatio = new AtomicDouble(); + long startNanos = System.nanoTime(); GridTestUtils.runAsync(new Runnable() { @Override public void run() { @@ -142,25 +151,38 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest { if (m.getName().equals("dfltDataRegion")) dirtyPages = m.getDirtyPages(); - long cpBufPages = 0; + long cpBufPages; long cpWrittenPages; - AtomicInteger cntr = ((GridCacheDatabaseSharedManager)((ignite(0)) - .context().cache().context().database())).getCheckpointer().currentProgress().writtenPagesCounter(); + Checkpointer checkpointer = ((GridCacheDatabaseSharedManager)((ignite(0)) + .context().cache().context().database())).getCheckpointer(); + AtomicInteger cntr = checkpointer.currentProgress().writtenPagesCounter(); cpWrittenPages = cntr == null ? 0 : cntr.get(); try { - cpBufPages = ((ignite(0)).context().cache().context().database() - .dataRegion("dfltDataRegion").pageMemory()).checkpointBufferPagesCount(); + PageMemoryEx pageMemory = (PageMemoryEx)(ignite(0)).context().cache().context().database() + .dataRegion("dfltDataRegion").pageMemory(); + cpBufPages = pageMemory.checkpointBufferPagesCount(); + + if (System.nanoTime() - startNanos > TimeUnit.SECONDS.toNanos(10)) { + double currentDirtyRatio = (double)dirtyPages / pageMemory.totalPages(); + double newMaxDirtyRatio = Math.max(maxDirtyRatio.get(), currentDirtyRatio); + maxDirtyRatio.set(newMaxDirtyRatio); + } } catch (IgniteCheckedException e) { e.printStackTrace(); + throw new RuntimeException("Something went wrong", e); } - System.out.println("@@@ putsPerSec=," + (putRate.value()) + ", getsPerSec=," + (getRate.value()) + ", dirtyPages=," - + dirtyPages + ", cpWrittenPages=," + cpWrittenPages + ", cpBufPages=," + cpBufPages); + System.out.println("@@@ globalPutsPerSec=" + + String.format("%.2f", globalPutsPerSec(putCount, startNanos)) + + ", putsPerSec=" + (putRate.value()) + ", getsPerSec=" + (getRate.value()) + ", dirtyPages=" + + dirtyPages + ", cpWrittenPages=" + cpWrittenPages + ", cpBufPages=" + cpBufPages + + ", maxDirtyRatio=" + String.format("%.2f", maxDirtyRatio.get()) + ); try { Thread.sleep(1000); @@ -172,14 +194,27 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest { } }, "metrics-view"); + final boolean intermittentPutsMode = false; + try (IgniteDataStreamer<Object, Object> ds = ig.dataStreamer(CACHE_NAME)) { ds.allowOverwrite(true); - for (int i = 0; i < keyCnt * 10; i++) { - ds.addData(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(), - ThreadLocalRandom.current().nextInt())); + while (true) { + long tensOfSecondsPassed = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startNanos) / 10; + if (intermittentPutsMode && tensOfSecondsPassed % 2 == 1) { + System.out.println("... sleeping ..."); + Thread.sleep(1000); + } + else { + ds.addData(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(), + ThreadLocalRandom.current().nextInt())); - putRate.increment(); + putRate.increment(); + putCount.incrementAndGet(); + } + + if (System.nanoTime() - startNanos > TimeUnit.MINUTES.toNanos(10)) + break; } } @@ -190,6 +225,11 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest { } } + /***/ + private double globalPutsPerSec(AtomicLong putCount, long startNanos) { + return (double)putCount.get() * TimeUnit.SECONDS.toNanos(1) / (System.nanoTime() - startNanos); + } + /** * */ @@ -247,4 +287,9 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest { cleanPersistenceDir(); U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false)); } + + /** {@inheritDoc} */ + @Override protected FailureHandler getFailureHandler(String igniteInstanceName) { + return new StopNodeOrHaltFailureHandler(); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java index 4a20d1890a9..19e74c3f1b5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java @@ -31,7 +31,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointMetadataFileIOFactory; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.HitRateMetric; import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; @@ -69,7 +69,7 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest { .setCheckpointFrequency(20_000) .setWriteThrottlingEnabled(true) .setCheckpointThreads(1) - .setFileIOFactory(new SlowCheckpointFileIOFactory(slowCheckpointEnabled, 5_000_000)); + .setFileIOFactory(new SlowCheckpointMetadataFileIOFactory(slowCheckpointEnabled, 5_000_000)); cfg.setDataStorageConfiguration(dbCfg); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedThrottleIntegrationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedThrottleIntegrationTest.java new file mode 100644 index 00000000000..6439f5e4ec7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedThrottleIntegrationTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointMetadataFileIOFactory; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cluster.ClusterState.ACTIVE; + +/** + * Integration tests for {@link PagesWriteSpeedBasedThrottle}. + */ +public class SpeedBasedThrottleIntegrationTest extends GridCommonAbstractTest { + /***/ + private final ListeningTestLogger listeningLog = new ListeningTestLogger(log); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + DataStorageConfiguration dbCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + // set small region size to make it easy achieve the necessity to throttle with speed-based throttle + .setMaxSize(60 * 1024 * 1024) + .setPersistenceEnabled(true) + ) + .setCheckpointFrequency(200) + .setWriteThrottlingEnabled(true) + .setFileIOFactory( + new SlowCheckpointMetadataFileIOFactory( + new AtomicBoolean(true), TimeUnit.MILLISECONDS.toNanos(10000) + ) + ); + + return cfg.setDataStorageConfiguration(dbCfg) + .setConsistentId(gridName) + .setGridLogger(listeningLog); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60 * 1000; + } + + /** + */ + @Test + public void speedBasedThrottleShouldBeActivatedWhenNeeded() throws Exception { + AtomicBoolean throttled = new AtomicBoolean(false); + listeningLog.registerListener(message -> { + if (message.startsWith("Throttling is applied to page modifications")) { + throttled.set(true); + } + }); + + Ignite ignite = startGrids(1); + + ignite.cluster().state(ACTIVE); + IgniteCache<Object, Object> cache = ignite.createCache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 1_000_000; i++) { + cache.put("key" + i, ThreadLocalRandom.current().nextDouble()); + + if (throttled.get()) { + break; + } + } + + assertTrue("Throttling was not triggered", throttled.get()); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java index 386738d7965..62ab1550735 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java @@ -28,7 +28,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointMetadataFileIOFactory; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.util.typedef.internal.U; @@ -64,7 +64,7 @@ public class CheckpointTest extends AbstractPerformanceStatisticsTest { .setMetricsEnabled(true) .setPersistenceEnabled(true)) .setWriteThrottlingEnabled(true) - .setFileIOFactory(new SlowCheckpointFileIOFactory(slowCheckpointEnabled, 500_000)) + .setFileIOFactory(new SlowCheckpointMetadataFileIOFactory(slowCheckpointEnabled, 500_000)) .setCheckpointThreads(1)); return cfg; diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java index aeb1865904c..536c7a782f2 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryNoStoreLeakTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.SpeedBasedThrottleBreakdownTest; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.SpeedBasedThrottleIntegrationTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.UsedPagesMetricTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.UsedPagesMetricTestPersistence; import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIOTest; @@ -90,6 +91,7 @@ public class IgnitePdsTestSuite5 { // Write throttling GridTestUtils.addTestIfNeeded(suite, PagesWriteThrottleSmokeTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, SpeedBasedThrottleBreakdownTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, SpeedBasedThrottleIntegrationTest.class, ignoredTests); // Discovery data handling on node join and old cluster abnormal shutdown GridTestUtils.addTestIfNeeded(suite, IgnitePdsDiscoDataHandlingInNewClusterTest.class, ignoredTests);