Repository: hadoop Updated Branches: refs/heads/branch-2 385c1daa4 -> 25f4327f0 refs/heads/trunk 44eb2bd7a -> ae8bccd50
HADOOP-13702. Add instrumented ReadWriteLock. Contributed by Jingcheng Du Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae8bccd5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae8bccd5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae8bccd5 Branch: refs/heads/trunk Commit: ae8bccd5090d8b42dae9a8e0c13a9766a7c42ecb Parents: 44eb2bd Author: Chris Douglas <cdoug...@apache.org> Authored: Fri Oct 21 11:28:11 2016 -0700 Committer: Chris Douglas <cdoug...@apache.org> Committed: Fri Oct 21 12:59:54 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/util/InstrumentedLock.java | 197 ++++++++++++++++ .../hadoop/util/InstrumentedReadLock.java | 92 ++++++++ .../hadoop/util/InstrumentedReadWriteLock.java | 58 +++++ .../hadoop/util/InstrumentedWriteLock.java | 54 +++++ .../hadoop/util/TestInstrumentedLock.java | 162 +++++++++++++ .../util/TestInstrumentedReadWriteLock.java | 234 +++++++++++++++++++ .../apache/hadoop/hdfs/InstrumentedLock.java | 185 --------------- .../datanode/fsdataset/impl/FsDatasetImpl.java | 2 +- .../hadoop/hdfs/TestInstrumentedLock.java | 166 ------------- 9 files changed, 798 insertions(+), 352 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8bccd5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java new file mode 100644 index 0000000..0520271 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.commons.logging.Log; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This is a debugging class that can be used by callers to track + * whether a specific lock is being held for too long and periodically + * log a warning and stack trace, if so. + * + * The logged warnings are throttled so that logs are not spammed. + * + * A new instance of InstrumentedLock can be created for each object + * that needs to be instrumented. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class InstrumentedLock implements Lock { + + private final Lock lock; + private final Log logger; + private final String name; + private final Timer clock; + + /** Minimum gap between two lock warnings. */ + private final long minLoggingGap; + /** Threshold for detecting long lock held time. */ + private final long lockWarningThreshold; + + // Tracking counters for lock statistics. + private volatile long lockAcquireTimestamp; + private final AtomicLong lastLogTimestamp; + private final AtomicLong warningsSuppressed = new AtomicLong(0); + + /** + * Create a instrumented lock instance which logs a warning message + * when lock held time is above given threshold. + * + * @param name the identifier of the lock object + * @param logger this class does not have its own logger, will log to the + * given logger instead + * @param minLoggingGapMs the minimum time gap between two log messages, + * this is to avoid spamming to many logs + * @param lockWarningThresholdMs the time threshold to view lock held + * time as being "too long" + */ + public InstrumentedLock(String name, Log logger, long minLoggingGapMs, + long lockWarningThresholdMs) { + this(name, logger, new ReentrantLock(), + minLoggingGapMs, lockWarningThresholdMs); + } + + public InstrumentedLock(String name, Log logger, Lock lock, + long minLoggingGapMs, long lockWarningThresholdMs) { + this(name, logger, lock, + minLoggingGapMs, lockWarningThresholdMs, new Timer()); + } + + @VisibleForTesting + InstrumentedLock(String name, Log logger, Lock lock, + long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) { + this.name = name; + this.lock = lock; + this.clock = clock; + this.logger = logger; + minLoggingGap = minLoggingGapMs; + lockWarningThreshold = lockWarningThresholdMs; + lastLogTimestamp = new AtomicLong( + clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold)); + } + + @Override + public void lock() { + lock.lock(); + startLockTiming(); + } + + @Override + public void lockInterruptibly() throws InterruptedException { + lock.lockInterruptibly(); + startLockTiming(); + } + + @Override + public boolean tryLock() { + if (lock.tryLock()) { + startLockTiming(); + return true; + } + return false; + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + if (lock.tryLock(time, unit)) { + startLockTiming(); + return true; + } + return false; + } + + @Override + public void unlock() { + long localLockReleaseTime = clock.monotonicNow(); + long localLockAcquireTime = lockAcquireTimestamp; + lock.unlock(); + check(localLockAcquireTime, localLockReleaseTime); + } + + @Override + public Condition newCondition() { + return lock.newCondition(); + } + + @VisibleForTesting + void logWarning(long lockHeldTime, long suppressed) { + logger.warn(String.format("Lock held time above threshold: " + + "lock identifier: %s " + + "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " + + "The stack trace is: %s" , + name, lockHeldTime, suppressed, + StringUtils.getStackTrace(Thread.currentThread()))); + } + + /** + * Starts timing for the instrumented lock. + */ + protected void startLockTiming() { + lockAcquireTimestamp = clock.monotonicNow(); + } + + /** + * Log a warning if the lock was held for too long. + * + * Should be invoked by the caller immediately AFTER releasing the lock. + * + * @param acquireTime - timestamp just after acquiring the lock. + * @param releaseTime - timestamp just before releasing the lock. + */ + protected void check(long acquireTime, long releaseTime) { + if (!logger.isWarnEnabled()) { + return; + } + + final long lockHeldTime = releaseTime - acquireTime; + if (lockWarningThreshold - lockHeldTime < 0) { + long now; + long localLastLogTs; + do { + now = clock.monotonicNow(); + localLastLogTs = lastLogTimestamp.get(); + long deltaSinceLastLog = now - localLastLogTs; + // check should print log or not + if (deltaSinceLastLog - minLoggingGap < 0) { + warningsSuppressed.incrementAndGet(); + return; + } + } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now)); + long suppressed = warningsSuppressed.getAndSet(0); + logWarning(lockHeldTime, suppressed); + } + } + + protected Lock getLock() { + return lock; + } + + protected Timer getTimer() { + return clock; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8bccd5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java new file mode 100644 index 0000000..09fd43e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This is a wrap class of a <tt>ReadLock</tt>. + * It extends the class {@link InstrumentedLock}, and can be used to track + * whether a specific read lock is being held for too long and log + * warnings if so. + * + * The logged warnings are throttled so that logs are not spammed. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class InstrumentedReadLock extends InstrumentedLock { + + private final ReentrantReadWriteLock readWriteLock; + + /** + * Uses the ThreadLocal to keep the time of acquiring locks since + * there can be multiple threads that hold the read lock concurrently. + */ + private final ThreadLocal<Long> readLockHeldTimeStamp = + new ThreadLocal<Long>() { + @Override + protected Long initialValue() { + return Long.MAX_VALUE; + }; + }; + + public InstrumentedReadLock(String name, Log logger, + ReentrantReadWriteLock readWriteLock, + long minLoggingGapMs, long lockWarningThresholdMs) { + this(name, logger, readWriteLock, minLoggingGapMs, lockWarningThresholdMs, + new Timer()); + } + + @VisibleForTesting + InstrumentedReadLock(String name, Log logger, + ReentrantReadWriteLock readWriteLock, + long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) { + super(name, logger, readWriteLock.readLock(), minLoggingGapMs, + lockWarningThresholdMs, clock); + this.readWriteLock = readWriteLock; + } + + @Override + public void unlock() { + boolean needReport = readWriteLock.getReadHoldCount() == 1; + long localLockReleaseTime = getTimer().monotonicNow(); + long localLockAcquireTime = readLockHeldTimeStamp.get(); + getLock().unlock(); + if (needReport) { + readLockHeldTimeStamp.remove(); + check(localLockAcquireTime, localLockReleaseTime); + } + } + + /** + * Starts timing for the instrumented read lock. + * It records the time to ThreadLocal. + */ + @Override + protected void startLockTiming() { + if (readWriteLock.getReadHoldCount() == 1) { + readLockHeldTimeStamp.set(getTimer().monotonicNow()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8bccd5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java new file mode 100644 index 0000000..62e6b09 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This is a wrap class of a {@link ReentrantReadWriteLock}. + * It implements the interface {@link ReadWriteLock}, and can be used to + * create instrumented <tt>ReadLock</tt> and <tt>WriteLock</tt>. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class InstrumentedReadWriteLock implements ReadWriteLock { + + private final Lock readLock; + private final Lock writeLock; + + InstrumentedReadWriteLock(boolean fair, String name, Log logger, + long minLoggingGapMs, long lockWarningThresholdMs) { + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair); + readLock = new InstrumentedReadLock(name, logger, readWriteLock, + minLoggingGapMs, lockWarningThresholdMs); + writeLock = new InstrumentedWriteLock(name, logger, readWriteLock, + minLoggingGapMs, lockWarningThresholdMs); + } + + @Override + public Lock readLock() { + return readLock; + } + + @Override + public Lock writeLock() { + return writeLock; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8bccd5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedWriteLock.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedWriteLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedWriteLock.java new file mode 100644 index 0000000..9208c1b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedWriteLock.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This is a wrap class of a <tt>WriteLock</tt>. + * It extends the class {@link InstrumentedLock}, and can be used to track + * whether a specific write lock is being held for too long and log + * warnings if so. + * + * The logged warnings are throttled so that logs are not spammed. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class InstrumentedWriteLock extends InstrumentedLock { + + public InstrumentedWriteLock(String name, Log logger, + ReentrantReadWriteLock readWriteLock, + long minLoggingGapMs, long lockWarningThresholdMs) { + this(name, logger, readWriteLock, minLoggingGapMs, lockWarningThresholdMs, + new Timer()); + } + + @VisibleForTesting + InstrumentedWriteLock(String name, Log logger, + ReentrantReadWriteLock readWriteLock, + long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) { + super(name, logger, readWriteLock.writeLock(), minLoggingGapMs, + lockWarningThresholdMs, clock); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8bccd5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java new file mode 100644 index 0000000..d3f6912 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +/** + * A test class for InstrumentedLock. + */ +public class TestInstrumentedLock { + + static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class); + + @Rule public TestName name = new TestName(); + + /** + * Test exclusive access of the lock. + * @throws Exception + */ + @Test(timeout=10000) + public void testMultipleThread() throws Exception { + String testname = name.getMethodName(); + InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300); + lock.lock(); + try { + Thread competingThread = new Thread() { + @Override + public void run() { + assertFalse(lock.tryLock()); + } + }; + competingThread.start(); + competingThread.join(); + } finally { + lock.unlock(); + } + } + + /** + * Test the correctness with try-with-resource syntax. + * @throws Exception + */ + @Test(timeout=10000) + public void testTryWithResourceSyntax() throws Exception { + String testname = name.getMethodName(); + final AtomicReference<Thread> lockThread = new AtomicReference<>(null); + Lock lock = new InstrumentedLock(testname, LOG, 0, 300) { + @Override + public void lock() { + super.lock(); + lockThread.set(Thread.currentThread()); + } + @Override + public void unlock() { + super.unlock(); + lockThread.set(null); + } + }; + AutoCloseableLock acl = new AutoCloseableLock(lock); + try (AutoCloseable localLock = acl.acquire()) { + assertEquals(acl, localLock); + Thread competingThread = new Thread() { + @Override + public void run() { + assertNotEquals(Thread.currentThread(), lockThread.get()); + assertFalse(lock.tryLock()); + } + }; + competingThread.start(); + competingThread.join(); + assertEquals(Thread.currentThread(), lockThread.get()); + } + assertNull(lockThread.get()); + } + + /** + * Test the lock logs warning when lock held time is greater than threshold + * and not log warning otherwise. + * @throws Exception + */ + @Test(timeout=10000) + public void testLockLongHoldingReport() throws Exception { + String testname = name.getMethodName(); + final AtomicLong time = new AtomicLong(0); + Timer mclock = new Timer() { + @Override + public long monotonicNow() { + return time.get(); + } + }; + Lock mlock = mock(Lock.class); + + final AtomicLong wlogged = new AtomicLong(0); + final AtomicLong wsuppresed = new AtomicLong(0); + InstrumentedLock lock = new InstrumentedLock( + testname, LOG, mlock, 2000, 300, mclock) { + @Override + void logWarning(long lockHeldTime, long suppressed) { + wlogged.incrementAndGet(); + wsuppresed.set(suppressed); + } + }; + + // do not log warning when the lock held time is short + lock.lock(); // t = 0 + time.set(200); + lock.unlock(); // t = 200 + assertEquals(0, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + lock.lock(); // t = 200 + time.set(700); + lock.unlock(); // t = 700 + assertEquals(1, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + // despite the lock held time is greater than threshold + // suppress the log warning due to the logging gap + // (not recorded in wsuppressed until next log message) + lock.lock(); // t = 700 + time.set(1100); + lock.unlock(); // t = 1100 + assertEquals(1, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + // log a warning message when the lock held time is greater the threshold + // and the logging time gap is satisfied. Also should display suppressed + // previous warnings. + time.set(2400); + lock.lock(); // t = 2400 + time.set(2800); + lock.unlock(); // t = 2800 + assertEquals(2, wlogged.get()); + assertEquals(1, wsuppresed.get()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8bccd5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java new file mode 100644 index 0000000..eeefa88 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * A test class for InstrumentedReadLock and InstrumentedWriteLock. + */ +public class TestInstrumentedReadWriteLock { + + static final Log LOG = LogFactory.getLog(TestInstrumentedReadWriteLock.class); + + @Rule + public TestName name = new TestName(); + + /** + * Tests exclusive access of the write lock. + * @throws Exception + */ + @Test(timeout=10000) + public void testWriteLock() throws Exception { + String testname = name.getMethodName(); + final ThreadLocal<Boolean> locked = new ThreadLocal<Boolean>(); + locked.set(Boolean.FALSE); + InstrumentedReadWriteLock readWriteLock = new InstrumentedReadWriteLock( + true, testname, LOG, 2000, 300); + final AutoCloseableLock writeLock = new AutoCloseableLock( + readWriteLock.writeLock()) { + @Override + public AutoCloseableLock acquire() { + AutoCloseableLock lock = super.acquire(); + locked.set(Boolean.TRUE); + return lock; + } + + @Override + public void release() { + super.release(); + locked.set(Boolean.FALSE); + } + }; + final AutoCloseableLock readLock = new AutoCloseableLock( + readWriteLock.readLock()); + try (AutoCloseableLock lock = writeLock.acquire()) { + Thread competingWriteThread = new Thread() { + @Override + public void run() { + assertFalse(writeLock.tryLock()); + } + }; + competingWriteThread.start(); + competingWriteThread.join(); + Thread competingReadThread = new Thread() { + @Override + public void run() { + assertFalse(readLock.tryLock()); + }; + }; + competingReadThread.start(); + competingReadThread.join(); + } + assertFalse(locked.get()); + locked.remove(); + } + + /** + * Tests the read lock. + * @throws Exception + */ + @Test(timeout=10000) + public void testReadLock() throws Exception { + String testname = name.getMethodName(); + InstrumentedReadWriteLock readWriteLock = new InstrumentedReadWriteLock( + true, testname, LOG, 2000, 300); + final AutoCloseableLock readLock = new AutoCloseableLock( + readWriteLock.readLock()); + final AutoCloseableLock writeLock = new AutoCloseableLock( + readWriteLock.writeLock()); + try (AutoCloseableLock lock = readLock.acquire()) { + Thread competingReadThread = new Thread() { + @Override + public void run() { + assertTrue(readLock.tryLock()); + readLock.release(); + } + }; + competingReadThread.start(); + competingReadThread.join(); + Thread competingWriteThread = new Thread() { + @Override + public void run() { + assertFalse(writeLock.tryLock()); + } + }; + competingWriteThread.start(); + competingWriteThread.join(); + } + } + + /** + * Tests the warning when the read lock is held longer than threshold. + * @throws Exception + */ + @Test(timeout=10000) + public void testReadLockLongHoldingReport() throws Exception { + String testname = name.getMethodName(); + final AtomicLong time = new AtomicLong(0); + Timer mclock = new Timer() { + @Override + public long monotonicNow() { + return time.get(); + } + }; + + final AtomicLong wlogged = new AtomicLong(0); + final AtomicLong wsuppresed = new AtomicLong(0); + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); + InstrumentedReadLock readLock = new InstrumentedReadLock(testname, LOG, + readWriteLock, 2000, 300, mclock) { + @Override + protected void logWarning(long lockHeldTime, long suppressed) { + wlogged.incrementAndGet(); + wsuppresed.set(suppressed); + } + }; + + readLock.lock(); // t = 0 + time.set(100); + readLock.unlock(); // t = 100 + assertEquals(0, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + readLock.lock(); // t = 100 + time.set(500); + readLock.unlock(); // t = 500 + assertEquals(1, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + // the suppress counting is only changed when + // log is needed in the test + readLock.lock(); // t = 500 + time.set(900); + readLock.unlock(); // t = 900 + assertEquals(1, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + readLock.lock(); // t = 900 + time.set(3000); + readLock.unlock(); // t = 3000 + assertEquals(2, wlogged.get()); + assertEquals(1, wsuppresed.get()); + } + + /** + * Tests the warning when the write lock is held longer than threshold. + * @throws Exception + */ + @Test(timeout=10000) + public void testWriteLockLongHoldingReport() throws Exception { + String testname = name.getMethodName(); + final AtomicLong time = new AtomicLong(0); + Timer mclock = new Timer() { + @Override + public long monotonicNow() { + return time.get(); + } + }; + + final AtomicLong wlogged = new AtomicLong(0); + final AtomicLong wsuppresed = new AtomicLong(0); + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); + InstrumentedWriteLock writeLock = new InstrumentedWriteLock(testname, LOG, + readWriteLock, 2000, 300, mclock) { + @Override + protected void logWarning(long lockHeldTime, long suppressed) { + wlogged.incrementAndGet(); + wsuppresed.set(suppressed); + } + }; + + writeLock.lock(); // t = 0 + time.set(100); + writeLock.unlock(); // t = 100 + assertEquals(0, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + writeLock.lock(); // t = 100 + time.set(500); + writeLock.unlock(); // t = 500 + assertEquals(1, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + // the suppress counting is only changed when + // log is needed in the test + writeLock.lock(); // t = 500 + time.set(900); + writeLock.unlock(); // t = 900 + assertEquals(1, wlogged.get()); + assertEquals(0, wsuppresed.get()); + + writeLock.lock(); // t = 900 + time.set(3000); + writeLock.unlock(); // t = 3000 + assertEquals(2, wlogged.get()); + assertEquals(1, wsuppresed.get()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8bccd5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java deleted file mode 100644 index 6279e95..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.commons.logging.Log; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Timer; - -import com.google.common.annotations.VisibleForTesting; - -/** - * This is a debugging class that can be used by callers to track - * whether a specifc lock is being held for too long and periodically - * log a warning and stack trace, if so. - * - * The logged warnings are throttled so that logs are not spammed. - * - * A new instance of InstrumentedLock can be created for each object - * that needs to be instrumented. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class InstrumentedLock implements Lock { - - private final Lock lock; - private final Log logger; - private final String name; - private final Timer clock; - - /** Minimum gap between two lock warnings. */ - private final long minLoggingGap; - /** Threshold for detecting long lock held time. */ - private final long lockWarningThreshold; - - // Tracking counters for lock statistics. - private volatile long lockAcquireTimestamp; - private final AtomicLong lastLogTimestamp; - private final AtomicLong warningsSuppressed = new AtomicLong(0); - - /** - * Create a instrumented lock instance which logs a warning message - * when lock held time is above given threshold. - * - * @param name the identifier of the lock object - * @param logger this class does not have its own logger, will log to the - * given logger instead - * @param minLoggingGapMs the minimum time gap between two log messages, - * this is to avoid spamming to many logs - * @param lockWarningThresholdMs the time threshold to view lock held - * time as being "too long" - */ - public InstrumentedLock(String name, Log logger, long minLoggingGapMs, - long lockWarningThresholdMs) { - this(name, logger, new ReentrantLock(), - minLoggingGapMs, lockWarningThresholdMs); - } - - public InstrumentedLock(String name, Log logger, Lock lock, - long minLoggingGapMs, long lockWarningThresholdMs) { - this(name, logger, lock, - minLoggingGapMs, lockWarningThresholdMs, new Timer()); - } - - @VisibleForTesting - InstrumentedLock(String name, Log logger, Lock lock, - long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) { - this.name = name; - this.lock = lock; - this.clock = clock; - this.logger = logger; - minLoggingGap = minLoggingGapMs; - lockWarningThreshold = lockWarningThresholdMs; - lastLogTimestamp = new AtomicLong( - clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold)); - } - - @Override - public void lock() { - lock.lock(); - lockAcquireTimestamp = clock.monotonicNow(); - } - - @Override - public void lockInterruptibly() throws InterruptedException { - lock.lockInterruptibly(); - lockAcquireTimestamp = clock.monotonicNow(); - } - - @Override - public boolean tryLock() { - if (lock.tryLock()) { - lockAcquireTimestamp = clock.monotonicNow(); - return true; - } - return false; - } - - @Override - public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - if (lock.tryLock(time, unit)) { - lockAcquireTimestamp = clock.monotonicNow(); - return true; - } - return false; - } - - @Override - public void unlock() { - long localLockReleaseTime = clock.monotonicNow(); - long localLockAcquireTime = lockAcquireTimestamp; - lock.unlock(); - check(localLockAcquireTime, localLockReleaseTime); - } - - @Override - public Condition newCondition() { - return lock.newCondition(); - } - - @VisibleForTesting - void logWarning(long lockHeldTime, long suppressed) { - logger.warn(String.format("Lock held time above threshold: " + - "lock identifier: %s " + - "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " + - "The stack trace is: %s" , - name, lockHeldTime, suppressed, - StringUtils.getStackTrace(Thread.currentThread()))); - } - - /** - * Log a warning if the lock was held for too long. - * - * Should be invoked by the caller immediately AFTER releasing the lock. - * - * @param acquireTime - timestamp just after acquiring the lock. - * @param releaseTime - timestamp just before releasing the lock. - */ - private void check(long acquireTime, long releaseTime) { - if (!logger.isWarnEnabled()) { - return; - } - - final long lockHeldTime = releaseTime - acquireTime; - if (lockWarningThreshold - lockHeldTime < 0) { - long now; - long localLastLogTs; - do { - now = clock.monotonicNow(); - localLastLogTs = lastLogTimestamp.get(); - long deltaSinceLastLog = now - localLastLogTs; - // check should print log or not - if (deltaSinceLastLog - minLoggingGap < 0) { - warningsSuppressed.incrementAndGet(); - return; - } - } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now)); - long suppressed = warningsSuppressed.getAndSet(0); - logWarning(lockHeldTime, suppressed); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8bccd5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 84569f3..7e7ae4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -58,7 +58,6 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.InstrumentedLock; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -109,6 +108,7 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; +import org.apache.hadoop.util.InstrumentedLock; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Timer; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8bccd5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java deleted file mode 100644 index f470688..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; - -import org.apache.hadoop.util.AutoCloseableLock; -import org.apache.hadoop.util.Timer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import static org.mockito.Mockito.*; -import static org.junit.Assert.*; - -/** - * A test class for InstrumentedLock. - */ -public class TestInstrumentedLock { - - static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class); - - @Rule public TestName name = new TestName(); - - /** - * Test exclusive access of the lock. - * @throws Exception - */ - @Test(timeout=10000) - public void testMultipleThread() throws Exception { - String testname = name.getMethodName(); - InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300); - lock.lock(); - try { - Thread competingThread = new Thread() { - @Override - public void run() { - assertFalse(lock.tryLock()); - } - }; - competingThread.start(); - competingThread.join(); - } finally { - lock.unlock(); - } - } - - /** - * Test the correctness with try-with-resource syntax. - * @throws Exception - */ - @Test(timeout=10000) - public void testTryWithResourceSyntax() throws Exception { - String testname = name.getMethodName(); - final AtomicReference<Thread> lockThread = new AtomicReference<>(null); - Lock lock = new InstrumentedLock(testname, LOG, 0, 300) { - @Override - public void lock() { - super.lock(); - lockThread.set(Thread.currentThread()); - } - @Override - public void unlock() { - super.unlock(); - lockThread.set(null); - } - }; - AutoCloseableLock acl = new AutoCloseableLock(lock); - try (AutoCloseable localLock = acl.acquire()) { - assertEquals(acl, localLock); - Thread competingThread = new Thread() { - @Override - public void run() { - assertNotEquals(Thread.currentThread(), lockThread.get()); - assertFalse(lock.tryLock()); - } - }; - competingThread.start(); - competingThread.join(); - assertEquals(Thread.currentThread(), lockThread.get()); - } - assertNull(lockThread.get()); - } - - /** - * Test the lock logs warning when lock held time is greater than threshold - * and not log warning otherwise. - * @throws Exception - */ - @Test(timeout=10000) - public void testLockLongHoldingReport() throws Exception { - String testname = name.getMethodName(); - final AtomicLong time = new AtomicLong(0); - Timer mclock = new Timer() { - @Override - public long monotonicNow() { - return time.get(); - } - }; - Lock mlock = mock(Lock.class); - - final AtomicLong wlogged = new AtomicLong(0); - final AtomicLong wsuppresed = new AtomicLong(0); - InstrumentedLock lock = new InstrumentedLock( - testname, LOG, mlock, 2000, 300, mclock) { - @Override - void logWarning(long lockHeldTime, long suppressed) { - wlogged.incrementAndGet(); - wsuppresed.set(suppressed); - } - }; - - // do not log warning when the lock held time is short - lock.lock(); // t = 0 - time.set(200); - lock.unlock(); // t = 200 - assertEquals(0, wlogged.get()); - assertEquals(0, wsuppresed.get()); - - lock.lock(); // t = 200 - time.set(700); - lock.unlock(); // t = 700 - assertEquals(1, wlogged.get()); - assertEquals(0, wsuppresed.get()); - - // despite the lock held time is greater than threshold - // suppress the log warning due to the logging gap - // (not recorded in wsuppressed until next log message) - lock.lock(); // t = 700 - time.set(1100); - lock.unlock(); // t = 1100 - assertEquals(1, wlogged.get()); - assertEquals(0, wsuppresed.get()); - - // log a warning message when the lock held time is greater the threshold - // and the logging time gap is satisfied. Also should display suppressed - // previous warnings. - time.set(2400); - lock.lock(); // t = 2400 - time.set(2800); - lock.unlock(); // t = 2800 - assertEquals(2, wlogged.get()); - assertEquals(1, wsuppresed.get()); - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org