This is an automated email from the ASF dual-hosted git repository. irakov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new a09d128 IGNITE-12434 Dump checkpoint readLock holder threads if writeLock can`t take lock more than threshold timeout. - Fixes #7124. a09d128 is described below commit a09d1284a8f5cbaae7a8548af6c32ca422810035 Author: zstan <stanilov...@gmail.com> AuthorDate: Tue Dec 17 16:09:21 2019 +0300 IGNITE-12434 Dump checkpoint readLock holder threads if writeLock can`t take lock more than threshold timeout. - Fixes #7124. Signed-off-by: Ivan Rakov <ira...@apache.org> --- .../GridCacheDatabaseSharedManager.java | 11 +- .../apache/ignite/internal/util/IgniteUtils.java | 188 +++++++++++---------- .../persistence/CheckpointReadLockFailureTest.java | 144 +++++++++++++++- 3 files changed, 250 insertions(+), 93 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 2eeb330..a89919b 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -226,6 +226,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ public static final String IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP = "IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP"; + /** Log read lock holders. */ + public static final String IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS = "IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS"; + /** MemoryPolicyConfiguration name reserved for meta store. */ public static final String METASTORE_DATA_REGION_NAME = "metastoreMemPlc"; @@ -242,6 +245,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ private final boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false); + /** */ + private final boolean logReadLockHolders = getBoolean(IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS); + /** * Starting from this number of dirty pages in checkpoint, array will be sorted with * {@link Arrays#parallelSort(Comparable[])} in case of {@link CheckpointWriteOrder#SEQUENTIAL}. @@ -297,7 +303,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** For testing only. */ private volatile GridFutureAdapter<Void> enableChangeApplied; - /** */ + /** Checkpont lock. */ ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); /** */ @@ -544,6 +550,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan final GridKernalContext kernalCtx = cctx.kernalContext(); + if (logReadLockHolders) + checkpointLock = new U.ReentrantReadWriteLockTracer(checkpointLock, kernalCtx, 5_000); + if (!kernalCtx.clientNode()) { kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 34ef7be..1b51d45 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -136,9 +136,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.function.Consumer; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.jar.JarFile; import java.util.logging.ConsoleHandler; import java.util.logging.Handler; @@ -234,6 +234,7 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.SB; @@ -339,6 +340,9 @@ public abstract class IgniteUtils { /** Default user version. */ public static final String DFLT_USER_VERSION = "0"; + /** Lock hold message. */ + public static final String LOCK_HOLD_MESSAGE = "ReadLock held the lock more than "; + /** Cache for {@link GridPeerDeployAware} fields to speed up reflection. */ private static final ConcurrentMap<String, IgniteBiTuple<Class<?>, Collection<Field>>> p2pFields = new ConcurrentHashMap<>(); @@ -10824,20 +10828,6 @@ public abstract class IgniteUtils { } /** - * @param lock Lock. - */ - public static ReentrantReadWriteLockTracer lockTracer(ReadWriteLock lock) { - return new ReentrantReadWriteLockTracer(lock); - } - - /** - * @param lock Lock. - */ - public static LockTracer lockTracer(Lock lock) { - return new LockTracer(lock); - } - - /** * Puts additional text to thread name. * Calls {@code enhanceThreadName(Thread.currentThread(), text)}. * For details see {@link #enhanceThreadName(Thread, String)}. @@ -11388,102 +11378,125 @@ public abstract class IgniteUtils { }; } - /** - * - */ - public static class ReentrantReadWriteLockTracer implements ReadWriteLock { + /** */ + public static class ReentrantReadWriteLockTracer extends ReentrantReadWriteLock { + /** */ + private static final long serialVersionUID = 0L; + /** Read lock. */ - private final LockTracer readLock; + private final ReadLockTracer readLock; /** Write lock. */ - private final LockTracer writeLock; + private final WriteLockTracer writeLock; + + /** Lock print threshold. */ + private long readLockThreshold; + + /** */ + private IgniteLogger log; /** - * @param delegate Delegate. + * @param delegate RWLock delegate. + * @param kctx Kernal context. + * @param readLockThreshold ReadLock threshold timeout. + * */ - public ReentrantReadWriteLockTracer(ReadWriteLock delegate) { - readLock = new LockTracer(delegate.readLock()); - writeLock = new LockTracer(delegate.writeLock()); + public ReentrantReadWriteLockTracer(ReentrantReadWriteLock delegate, GridKernalContext kctx, long readLockThreshold) { + log = kctx.cache().context().logger(getClass()); + + readLock = new ReadLockTracer(delegate, log, readLockThreshold); + + writeLock = new WriteLockTracer(delegate); + + this.readLockThreshold = readLockThreshold; } /** {@inheritDoc} */ - @NotNull @Override public Lock readLock() { + @Override public ReadLock readLock() { return readLock; } /** {@inheritDoc} */ - @NotNull @Override public Lock writeLock() { + @Override public WriteLock writeLock() { return writeLock; } - /** - * - */ - public LockTracer getReadLock() { - return readLock; - } - - /** - * - */ - public LockTracer getWriteLock() { - return writeLock; + /** */ + public long lockWaitThreshold() { + return readLockThreshold; } } - /** - * - */ - public static class LockTracer implements Lock { + /** */ + private static class ReadLockTracer extends ReentrantReadWriteLock.ReadLock { + /** */ + private static final long serialVersionUID = 0L; + /** Delegate. */ - private final Lock delegate; + private final ReentrantReadWriteLock.ReadLock delegate; - private final AtomicLong cnt = new AtomicLong(); + /** */ + private static final ThreadLocal<T2<Integer, Long>> READ_LOCK_HOLDER_TS = + ThreadLocal.withInitial(() -> new T2<>(0, 0L)); - /** Count. */ - private final ConcurrentMap<String, AtomicLong> cntMap = new ConcurrentHashMap<>(); + /** */ + private IgniteLogger log; - /** - * @param delegate Delegate. - */ - public LockTracer(Lock delegate) { - this.delegate = delegate; - } + /** */ + private long readLockThreshold; - /** - * - */ - private void inc(){ - cnt.incrementAndGet(); + /** */ + public ReadLockTracer(ReentrantReadWriteLock lock, IgniteLogger log, long readLockThreshold) { + super(lock); - String name = Thread.currentThread().getName(); + delegate = lock.readLock(); - AtomicLong cnt = cntMap.get(name); + this.log = log; - if (cnt == null) { - AtomicLong cnt0 = cntMap.putIfAbsent(name, cnt = new AtomicLong()); + this.readLockThreshold = readLockThreshold; + } - if (cnt0 != null) - cnt = cnt0; - } + /** */ + private void inc() { + T2<Integer, Long> val = READ_LOCK_HOLDER_TS.get(); + + int cntr = val.get1(); + + if (cntr == 0) + val.set2(U.currentTimeMillis()); - cnt.incrementAndGet(); + val.set1(++cntr); + + READ_LOCK_HOLDER_TS.set(val); } - /** - * - */ - private void dec(){ - cnt.decrementAndGet(); + /** */ + private void dec() { + T2<Integer, Long> val = READ_LOCK_HOLDER_TS.get(); + + int cntr = val.get1(); + + if (--cntr == 0) { + long timeout = U.currentTimeMillis() - val.get2(); - String name = Thread.currentThread().getName(); + if (timeout > readLockThreshold) { + GridStringBuilder sb = new GridStringBuilder(); - AtomicLong cnt = cntMap.get(name); + sb.a(LOCK_HOLD_MESSAGE + timeout + " ms." + nl()); - cnt.decrementAndGet(); + U.printStackTrace(Thread.currentThread().getId(), sb); + + U.warn(log, sb.toString()); + } + } + + val.set1(cntr); + + READ_LOCK_HOLDER_TS.set(val); } /** {@inheritDoc} */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") @Override public void lock() { delegate.lock(); @@ -11491,6 +11504,7 @@ public abstract class IgniteUtils { } /** {@inheritDoc} */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") @Override public void lockInterruptibly() throws InterruptedException { delegate.lockInterruptibly(); @@ -11525,24 +11539,16 @@ public abstract class IgniteUtils { dec(); } + } - /** {@inheritDoc} */ - @NotNull @Override public Condition newCondition() { - return delegate.newCondition(); - } - - /** - * - */ - public Map<String, AtomicLong> getLockUnlockCounters() { - return new HashMap<>(cntMap); - } - - /** - * - */ - public long getLockUnlockCounter() { - return cnt.get(); + /** */ + private static class WriteLockTracer extends ReentrantReadWriteLock.WriteLock { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + public WriteLockTracer(ReentrantReadWriteLock lock) { + super(lock); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java index 5f2a910..b15f6a9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java @@ -21,6 +21,8 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.ignite.Ignite; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -30,18 +32,29 @@ import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS; +import static org.apache.ignite.internal.util.IgniteUtils.LOCK_HOLD_MESSAGE; + /** * Tests critical failure handling on checkpoint read lock acquisition errors. */ public class CheckpointReadLockFailureTest extends GridCommonAbstractTest { /** */ + private ListeningTestLogger testLog; + + /** */ private static final AbstractFailureHandler FAILURE_HND = new AbstractFailureHandler() { @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { if (failureCtx.type() != FailureType.SYSTEM_CRITICAL_OPERATION_TIMEOUT) @@ -59,13 +72,21 @@ public class CheckpointReadLockFailureTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - return super.getConfiguration(igniteInstanceName) + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName) .setFailureHandler(FAILURE_HND) .setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setPersistenceEnabled(true)) .setCheckpointFrequency(Integer.MAX_VALUE) .setCheckpointReadLockTimeout(1)); + + if (testLog != null) { + cfg.setGridLogger(testLog); + + testLog = null; + } + + return cfg; } /** @@ -136,4 +157,125 @@ public class CheckpointReadLockFailureTest extends GridCommonAbstractTest { stopGrid(0); } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS, value = "true") + public void testPrintCpRLockHolder() throws Exception { + CountDownLatch canRelease = new CountDownLatch(1); + + testLog = new ListeningTestLogger(false, log); + + LogListener lsnr = LogListener.matches(LOCK_HOLD_MESSAGE).build(); + + testLog.registerListener(lsnr); + + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database(); + + U.ReentrantReadWriteLockTracer tracker = (U.ReentrantReadWriteLockTracer)db.checkpointLock; + + GridTestUtils.runAsync(() -> { + db.checkpointLock.readLock().lock(); + + try { + canRelease.await(tracker.lockWaitThreshold() + 500, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + e.printStackTrace(); + } finally { + db.checkpointLock.readLock().unlock(); + } + }, "async-runnable-runner-1"); + + assertTrue(GridTestUtils.waitForCondition(lsnr::check, tracker.lockWaitThreshold() + 1000)); + + stopGrid(0); + } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS, value = "true") + public void testReentrance() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database(); + + ReentrantReadWriteLock rwLock = db.checkpointLock; + + CountDownLatch waitFirstRLock = new CountDownLatch(1); + + CountDownLatch waitSecondRLock = new CountDownLatch(1); + + long timeout = 500L; + + IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> { + //noinspection LockAcquiredButNotSafelyReleased + rwLock.readLock().lock(); + + //noinspection LockAcquiredButNotSafelyReleased + rwLock.readLock().lock(); + + rwLock.readLock().unlock(); + + waitFirstRLock.countDown(); + + try { + waitSecondRLock.await(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + + rwLock.readLock().unlock(); + }, "async-runnable-runner-1"); + + IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> { + try { + waitFirstRLock.await(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + rwLock.writeLock().tryLock(); + + assertFalse(GridTestUtils.waitForCondition(rwLock.writeLock()::isHeldByCurrentThread, timeout)); + } + catch (IgniteInterruptedCheckedException e) { + e.printStackTrace(); + } + + waitSecondRLock.countDown(); + + try { + rwLock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS); + + assertTrue(rwLock.writeLock().isHeldByCurrentThread()); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + finally { + rwLock.writeLock().unlock(); + } + + }, "async-runnable-runner-2"); + + f1.get(4 * timeout); + + f0.get(4 * timeout); + + stopGrid(0); + } }