GEODE-2654: Backups can capture different members from different points in time Online backup now takes a backup from a single point in time. This is done by making all write operations get a backup lock which causes them to wait until the oplogs are rolled by backup.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f1326be5 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f1326be5 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f1326be5 Branch: refs/heads/feature/GEM-1483 Commit: f1326be55c54e96d75fe8ec6391aa4581f312f86 Parents: bf8971c Author: Lynn Gallinat <lgalli...@pivotal.io> Authored: Thu Jun 22 11:34:37 2017 -0700 Committer: Lynn Gallinat <lgalli...@pivotal.io> Committed: Wed Jul 26 12:02:09 2017 -0700 ---------------------------------------------------------------------- .../apache/geode/internal/cache/BackupLock.java | 66 +- .../geode/internal/cache/DiskInitFile.java | 287 +++---- .../geode/internal/cache/DiskStoreImpl.java | 16 +- .../org/apache/geode/internal/cache/Oplog.java | 764 ++++++++++--------- .../BackupPrepareAndFinishMsgDUnitTest.java | 571 ++++++++++++++ .../geode/codeAnalysis/excludedClasses.txt | 1 + .../codeAnalysis/sanctionedSerializables.txt | 1 - 7 files changed, 1187 insertions(+), 519 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/f1326be5/geode-core/src/main/java/org/apache/geode/internal/cache/BackupLock.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BackupLock.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BackupLock.java index 4b4fb10..f531860 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BackupLock.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BackupLock.java @@ -28,54 +28,72 @@ import java.util.concurrent.locks.ReentrantLock; */ public class BackupLock extends ReentrantLock { - private Thread backupThread; + private final ThreadLocal<Boolean> isBackupThread = new ThreadLocal<Boolean>(); boolean isBackingUp; Condition backupDone = super.newCondition(); + // test hook + private BackupLockTestHook hook = null; + + public interface BackupLockTestHook { + /** + * Test hook called before the wait for backup to complete + */ + public void beforeWaitForBackupCompletion(); + } + + public void setBackupLockTestHook(BackupLockTestHook testHook) { + hook = testHook; + } + public void lockForBackup() { super.lock(); isBackingUp = true; super.unlock(); } - public void setBackupThread(Thread thread) { - super.lock(); - backupThread = thread; - super.unlock(); + public void setBackupThread() { + isBackupThread.set(true); } public void unlockForBackup() { super.lock(); isBackingUp = false; - backupThread = null; + isBackupThread.set(false); backupDone.signalAll(); super.unlock(); } - /** - * Acquire this lock, waiting for an in progress backup if one is in progress. - */ + public boolean isCurrentThreadDoingBackup() { + Boolean result = isBackupThread.get(); + return (result != null) && result; + } + @Override - public void lock() { - lock(true); + public void unlock() { + // The backup thread does not need to unlock this lock since it never gets the lock. It is the + // only thread that has permission to modify disk files during backup. + if (!isCurrentThreadDoingBackup()) { + super.unlock(); + } } /** - * Acquire this lock, Optionally waiting for a backup to finish the first phase. Any operations - * that update metadata related to the distributed system state should pass true for this flag, - * because we need to make sure we get a point in time snapshot of the init files across members - * to for metadata consistentency. + * Acquire this lock, waiting for a backup to finish the first phase. * - * Updates which update only record changes to the local state on this member(eg, switching - * oplogs), do not need to wait for the backup. - * - * @param waitForBackup if true, we will wait for an in progress backup before acquiring this - * lock. */ - public void lock(boolean waitForBackup) { - super.lock(); - while (isBackingUp && waitForBackup && !(Thread.currentThread() == backupThread)) { - backupDone.awaitUninterruptibly(); + @Override + public void lock() { + // The backup thread is a noop; it does not need to get the lock since it is the only thread + // with permission to modify disk files during backup + if (!isCurrentThreadDoingBackup()) { + super.lock(); + while (isBackingUp) { + if (hook != null) { + hook.beforeWaitForBackupCompletion(); + } + backupDone.awaitUninterruptibly(); + } } } } http://git-wip-us.apache.org/repos/asf/geode/blob/f1326be5/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java index 0925d28..33aa0b5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java @@ -14,6 +14,34 @@ */ package org.apache.geode.internal.cache; +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; @@ -21,10 +49,13 @@ import it.unimi.dsi.fastutil.longs.LongIterator; import it.unimi.dsi.fastutil.longs.LongOpenHashSet; import it.unimi.dsi.fastutil.objects.ObjectIterator; import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.Instantiator; +import org.apache.geode.InternalGemFireException; import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.EvictionAction; import org.apache.geode.cache.EvictionAlgorithm; @@ -52,34 +83,6 @@ import org.apache.geode.internal.concurrent.ConcurrentHashSet; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; -import org.apache.logging.log4j.Logger; - -import java.io.BufferedInputStream; -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; /** * Does all the IF file work for a DiskStoreImpl. @@ -96,6 +99,8 @@ public class DiskInitFile implements DiskInitFileInterpreter { public static final byte END_OF_RECORD_ID = 21; static final int OPLOG_FILE_ID_REC_SIZE = 1 + 8 + 1; + private final ReentrantLock lock = new ReentrantLock(); + /** * Written to IF Byte Format: 8: leastSigBits of UUID 8: mostSigBits of UUID 1: EndOfRecordMarker */ @@ -381,11 +386,19 @@ public class DiskInitFile implements DiskInitFileInterpreter { transient private boolean gotEOF; - /** - * Lock used to synchronize access to the init file. This is a lock rather than a synchronized - * block because the backup tool needs to acquire this lock. - */ - private final BackupLock lock = new BackupLock(); + private void lock(boolean useBackupLock) { + if (useBackupLock) { + getDiskStore().getBackupLock().lock(); + } + this.lock.lock(); + } + + private void unlock(boolean useBackupLock) { + if (useBackupLock) { + getDiskStore().getBackupLock().unlock(); + } + this.lock.unlock(); + } private void recoverFromFailedCompaction() { File tmpFile = getTempFile(); @@ -530,11 +543,11 @@ public class DiskInitFile implements DiskInitFileInterpreter { private int liveRegions = 0; // added for bug 41618 public boolean hasLiveRegions() { - lock.lock(false); + lock(false); try { return this.liveRegions > 0; } finally { - lock.unlock(); + unlock(false); } } @@ -817,7 +830,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } DiskRegionView takeDiskRegionByName(String name) { - lock.lock(false); + lock(false); try { DiskRegionView result = this.drMapByName.remove(name); if (result != null) { @@ -825,16 +838,16 @@ public class DiskInitFile implements DiskInitFileInterpreter { } return result; } finally { - lock.unlock(); + unlock(false); } } Map<Long, PlaceHolderDiskRegion> getDRMap() { - lock.lock(false); + lock(false); try { return new HashMap<Long, PlaceHolderDiskRegion>(drMap); } finally { - lock.unlock(); + unlock(false); } } @@ -843,7 +856,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { DiskRegionStats stats, CancelCriterion cancel, DiskExceptionHandler exceptionHandler, RegionAttributes ra, EnumSet<DiskRegionFlag> flags, String partitionName, int startingBucketId, Compressor compressor, boolean offHeap) { - lock.lock(false); + lock(true); try { // need to call the constructor and addDiskRegion while synced DiskRegion result = new DiskRegion(dsi, name, isBucket, isPersistBackup, overflowEnabled, @@ -852,21 +865,21 @@ public class DiskInitFile implements DiskInitFileInterpreter { dsi.addDiskRegion(result); return result; } finally { - lock.unlock(); + unlock(true); } } DiskRegionView getDiskRegionByName(String name) { - lock.lock(false); + lock(false); try { return this.drMapByName.get(name); } finally { - lock.unlock(); + unlock(false); } } DiskRegionView getDiskRegionByPrName(String name) { - lock.lock(false); + lock(false); try { for (PlaceHolderDiskRegion dr : this.drMapByName.values()) { if (dr.isBucket()) { @@ -877,7 +890,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } return null; } finally { - lock.unlock(); + unlock(false); } } @@ -1249,7 +1262,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { private void saveInstantiator(int id, String instantiatorClassName, String instantiatedClassName) { - lock.lock(); + lock(true); try { if (!this.compactInProgress && this.instIds.contains(id)) { // instantiator already written to disk so just return @@ -1273,7 +1286,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { .toLocalizedString(ex), this.parent); } finally { - lock.unlock(); + unlock(true); } } @@ -1308,7 +1321,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { * Write the specified DataSerializer to the file. */ private void saveDataSerializer(DataSerializer ds) { - lock.lock(); + lock(true); try { if (!this.compactInProgress && this.dsIds.contains(ds.getId())) { // dataSerializer already written to disk so just return @@ -1327,7 +1340,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { .toLocalizedString(ex), this.parent); } finally { - lock.unlock(); + unlock(true); } } @@ -1412,7 +1425,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { private static final double COMPACT_RATIO = 0.5; private void compactIfNeeded() { - lock.lock(false); + lock(true); try { if (this.compactInProgress) return; @@ -1428,7 +1441,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } catch (IOException ignore) { return; } finally { - lock.unlock(); + unlock(true); } } @@ -1441,7 +1454,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } private void compact() { - lock.lock(false); + lock(true); this.compactInProgress = true; try { try { @@ -1501,16 +1514,16 @@ public class DiskInitFile implements DiskInitFileInterpreter { } } finally { this.compactInProgress = false; - lock.unlock(); + unlock(true); } } public void copyTo(File targetDir) throws IOException { - lock.lock(false); + lock(false); try { FileUtils.copyFileToDirectory(this.ifFile, targetDir); } finally { - lock.unlock(); + unlock(false); } } @@ -1569,7 +1582,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { * Write all live data to the init file */ private void writeLiveData() { - lock.lock(false); + lock(true); try { this.ifLiveRecordCount = 0; this.ifTotalRecordCount = 0; @@ -1594,7 +1607,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { this.ifTotalRecordCount); } } finally { - lock.unlock(); + unlock(true); } } @@ -1646,7 +1659,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } private void writeDiskStoreId() { - lock.lock(); + lock(true); try { ByteBuffer bb = getIFWriteBuffer(1 + 6 + 1); bb.put(OPLOG_MAGIC_SEQ_ID); @@ -1669,7 +1682,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } throw dae; } finally { - lock.unlock(); + unlock(true); } } @@ -1916,7 +1929,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } void closeRegion(DiskRegionView dr) { - lock.lock(); + lock(true); try { this.parent.rmById(dr.getId()); // fix for bug 41334 PlaceHolderDiskRegion phdr = new PlaceHolderDiskRegion(dr); @@ -1924,12 +1937,12 @@ public class DiskInitFile implements DiskInitFileInterpreter { this.drMapByName.put(dr.getName(), phdr); // @todo make sure we only have one instance of the region for this name } finally { - lock.unlock(); + unlock(true); } } void clearRegion(DiskRegionView dr, long clearOplogEntryId) { - lock.lock(); + lock(true); try { if (clearOplogEntryId != DiskStoreImpl.INVALID_ID) { this.ifTotalRecordCount++; @@ -1945,7 +1958,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { writeIFRecord(IFREC_CLEAR_REGION_ID, dr, clearOplogEntryId); } } finally { - lock.unlock(); + unlock(true); } } @@ -1953,7 +1966,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { * Clear the region using an RVV. */ void clearRegion(DiskRegion dr, RegionVersionVector rvv) { - lock.lock(); + lock(true); try { this.ifTotalRecordCount++; if (dr.getClearRVV() == null) { @@ -1964,7 +1977,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { dr.setClearRVV(rvv); writeClearRecord(dr, rvv); } finally { - lock.unlock(); + unlock(true); } } @@ -2001,7 +2014,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } void createRegion(DiskRegionView drv) { - lock.lock(); + lock(true); try { if (!drv.isRecreated()) { writeIFRecord(IFREC_CREATE_REGION_ID, drv, drv.getName()); @@ -2015,24 +2028,24 @@ public class DiskInitFile implements DiskInitFileInterpreter { } } } finally { - lock.unlock(); + unlock(true); } } void beginDestroyRegion(DiskRegionView dr) { - lock.lock(); + lock(true); try { if (regionStillCreated(dr)) { cmnBeginDestroyRegion(dr); writeIFRecord(IFREC_BEGIN_DESTROY_REGION_ID, dr); } } finally { - lock.unlock(); + unlock(true); } } void endDestroyRegion(DiskRegionView dr) { - lock.lock(); + lock(true); try { if (regionStillCreated(dr)) { cmnEndDestroyRegion(dr); @@ -2043,127 +2056,127 @@ public class DiskInitFile implements DiskInitFileInterpreter { } } } finally { - lock.unlock(); + unlock(true); } } void beginDestroyDataStorage(DiskRegionView dr) { - lock.lock(); + lock(true); try { assert regionStillCreated(dr); cmnBeginPartialDestroyRegion(dr); writeIFRecord(IFREC_BEGIN_PARTIAL_DESTROY_REGION_ID, dr); } finally { - lock.unlock(); + unlock(true); } } void endDestroyDataStorage(DiskRegionView dr) { - lock.lock(); + lock(true); try { assert regionStillCreated(dr); cmnEndPartialDestroyRegion(dr); writeIFRecord(IFREC_END_PARTIAL_DESTROY_REGION_ID, dr); } finally { - lock.unlock(); + unlock(true); } } public void createPersistentPR(String name, PRPersistentConfig config) { - lock.lock(); + lock(true); try { if (cmnPRCreate(name, config)) { writePRCreate(name, config); } } finally { - lock.unlock(); + unlock(true); } } public void destroyPersistentPR(String name) { - lock.lock(); + lock(true); try { if (cmnPRDestroy(name)) { writePRDestroy(name); } } finally { - lock.unlock(); + unlock(true); } } public PRPersistentConfig getPersistentPR(String name) { - lock.lock(false); + lock(false); try { return prMap.get(name); } finally { - lock.unlock(); + unlock(false); } } public Map<String, PRPersistentConfig> getAllPRs() { - lock.lock(false); + lock(false); try { return new HashMap<String, PRPersistentConfig>(prMap); } finally { - lock.unlock(); + unlock(false); } } void crfCreate(long oplogId) { - lock.lock(false); + lock(true); try { cmnCrfCreate(oplogId); writeIFRecord(IFREC_CRF_CREATE, oplogId); } finally { - lock.unlock(); + unlock(true); } } void drfCreate(long oplogId) { - lock.lock(false); + lock(true); try { cmnDrfCreate(oplogId); writeIFRecord(IFREC_DRF_CREATE, oplogId); } finally { - lock.unlock(); + unlock(true); } } void krfCreate(long oplogId) { - lock.lock(false); + lock(true); try { cmnKrfCreate(oplogId); writeIFRecord(IFREC_KRF_CREATE, oplogId); } finally { - lock.unlock(); + unlock(true); } } void crfDelete(long oplogId) { - lock.lock(false); + lock(true); try { if (cmnCrfDelete(oplogId)) { // call writeIFRecord AFTER cmnCrfDelete to fix bug 41505 writeIFRecord(IFREC_CRF_DELETE, oplogId); } } finally { - lock.unlock(); + unlock(true); } } void drfDelete(long oplogId) { - lock.lock(false); + lock(true); try { if (cmnDrfDelete(oplogId)) { writeIFRecord(IFREC_DRF_DELETE, oplogId); } } finally { - lock.unlock(); + unlock(true); } } int getOrCreateCanonicalId(Object object) { - lock.lock(false); + lock(true); try { int id = canonicalIdHolder.getId(object); if (id <= 0) { @@ -2172,21 +2185,21 @@ public class DiskInitFile implements DiskInitFileInterpreter { } return id; } finally { - lock.unlock(); + unlock(true); } } Object getCanonicalObject(int id) { - lock.lock(false); + lock(false); try { return canonicalIdHolder.getObject(id); } finally { - lock.unlock(); + unlock(false); } } void close() { - lock.lock(); + lock(true); try { if (this.closed) return; @@ -2203,17 +2216,17 @@ public class DiskInitFile implements DiskInitFileInterpreter { basicDestroy(); } } finally { - lock.unlock(); + unlock(true); } } void destroy() { - lock.lock(); + lock(true); try { close(); basicDestroy(); } finally { - lock.unlock(); + unlock(true); } } @@ -2229,31 +2242,31 @@ public class DiskInitFile implements DiskInitFileInterpreter { void addMyInitializingPMID(DiskRegionView dr, PersistentMemberID pmid) { - lock.lock(); + lock(true); try { if (regionStillCreated(dr)) { cmnAddMyInitializingPMID(dr, pmid); writePMIDRecord(IFREC_MY_MEMBER_INITIALIZING_ID, dr, pmid, false); } } finally { - lock.unlock(); + unlock(true); } } void markInitialized(DiskRegionView dr) { - lock.lock(); + lock(true); try { if (regionStillCreated(dr)) { writeIFRecord(IFREC_MY_MEMBER_INITIALIZED_ID, dr); cmnMarkInitialized(dr); } } finally { - lock.unlock(); + unlock(true); } } void addOnlinePMID(DiskRegionView dr, PersistentMemberID pmid) { - lock.lock(); + lock(true); try { if (regionStillCreated(dr)) { if (dr.addOnlineMember(pmid)) { @@ -2264,12 +2277,12 @@ public class DiskInitFile implements DiskInitFileInterpreter { } } } finally { - lock.unlock(); + unlock(true); } } void addOfflinePMID(DiskRegionView dr, PersistentMemberID pmid) { - lock.lock(); + lock(true); try { if (regionStillCreated(dr)) { if (dr.addOfflineMember(pmid)) { @@ -2280,12 +2293,12 @@ public class DiskInitFile implements DiskInitFileInterpreter { } } } finally { - lock.unlock(); + unlock(true); } } void addOfflineAndEqualPMID(DiskRegionView dr, PersistentMemberID pmid) { - lock.lock(); + lock(true); try { if (regionStillCreated(dr)) { if (dr.addOfflineAndEqualMember(pmid)) { @@ -2296,12 +2309,12 @@ public class DiskInitFile implements DiskInitFileInterpreter { } } } finally { - lock.unlock(); + unlock(true); } } void rmPMID(DiskRegionView dr, PersistentMemberID pmid) { - lock.lock(); + lock(true); try { if (regionStillCreated(dr)) { if (dr.rmOnlineMember(pmid) || dr.rmOfflineMember(pmid) || dr.rmEqualMember(pmid)) { @@ -2312,7 +2325,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } } } finally { - lock.unlock(); + unlock(true); } } @@ -2323,7 +2336,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { return; } - lock.lock(); + lock(true); try { if (cmnRevokeDiskStoreId(revokedPattern)) { // we now have two records to gc (this one and the live one we removed). @@ -2332,7 +2345,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { writeRevokedMember(revokedPattern); } } finally { - lock.unlock(); + unlock(true); } } @@ -2340,12 +2353,12 @@ public class DiskInitFile implements DiskInitFileInterpreter { * Get the set of members known to be revoked */ public Set<PersistentMemberPattern> getRevokedIDs() { - lock.lock(false); + lock(false); try { // Return a copy of the set, because we modify it in place. return new HashSet<PersistentMemberPattern>(this.revokedMembers); } finally { - lock.unlock(); + unlock(false); } } @@ -2354,21 +2367,21 @@ public class DiskInitFile implements DiskInitFileInterpreter { * Return true if the given dr is still created in this IF. */ boolean regionStillCreated(DiskRegionView dr) { - lock.lock(false); + lock(false); try { return getDiskRegionById(dr.getId()) != null; } finally { - lock.unlock(); + unlock(false); } } boolean regionExists(long drId) { - lock.lock(false); + lock(false); try { // @todo make drMap concurrent so this call can be fast return this.drMap.containsKey(drId); } finally { - lock.unlock(); + unlock(false); } } @@ -2386,11 +2399,11 @@ public class DiskInitFile implements DiskInitFileInterpreter { } Collection<DiskRegionView> getKnown() { - lock.lock(false); + lock(false); try { return new ArrayList<DiskRegionView>(this.drMap.values()); } finally { - lock.unlock(); + unlock(false); } } @@ -2585,7 +2598,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { public void destroyPRRegion(String prName) { ArrayList<PlaceHolderDiskRegion> buckets = new ArrayList<PlaceHolderDiskRegion>(); - lock.lock(); + lock(true); try { for (PlaceHolderDiskRegion dr : this.drMapByName.values()) { if (dr.isBucket()) { @@ -2595,7 +2608,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } } } finally { - lock.unlock(); + unlock(true); } for (PlaceHolderDiskRegion dr : buckets) { endDestroyRegion(dr); @@ -2612,7 +2625,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { String offHeapOption, boolean printToConsole) { StringBuffer sb = new StringBuffer(); ArrayList<PlaceHolderDiskRegion> buckets = new ArrayList<PlaceHolderDiskRegion>(); - lock.lock(); + lock(true); try { for (PlaceHolderDiskRegion dr : this.drMapByName.values()) { if (dr.isBucket()) { @@ -2633,7 +2646,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { printInfo = false; } } finally { - lock.unlock(); + unlock(true); } return sb.toString(); } @@ -2642,13 +2655,13 @@ public class DiskInitFile implements DiskInitFileInterpreter { String lruLimitOption, String concurrencyLevelOption, String initialCapacityOption, String loadFactorOption, String compressorClassNameOption, String statisticsEnabledOption, String offHeapOption, boolean printToConsole) { - lock.lock(); + lock(true); try { return basicModifyRegion(false, drv, lruOption, lruActionOption, lruLimitOption, concurrencyLevelOption, initialCapacityOption, loadFactorOption, compressorClassNameOption, statisticsEnabledOption, offHeapOption, printToConsole); } finally { - lock.unlock(); + unlock(true); } } @@ -2770,20 +2783,8 @@ public class DiskInitFile implements DiskInitFileInterpreter { return message; } - public void lockForBackup() { - lock.lockForBackup(); - } - - public void unlockForBackup() { - lock.unlockForBackup(); - } - - public void setBackupThread(Thread thread) { - lock.setBackupThread(thread); - } - private void writeGemfireVersion(Version version) { - lock.lock(); + lock(true); try { ByteBuffer bb = getIFWriteBuffer(1 + 3 + 1); bb.put(IFREC_GEMFIRE_VERSION); @@ -2799,7 +2800,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { } throw dae; } finally { - lock.unlock(); + unlock(true); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/f1326be5/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java index 3e97d0e..94d1253 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java @@ -4002,6 +4002,16 @@ public class DiskStoreImpl implements DiskStore { private final HashMap<String, LRUStatistics> prlruStatMap = new HashMap<String, LRUStatistics>(); + /** + * Lock used to synchronize access to the init file. This is a lock rather than a synchronized + * block because the backup tool needs to acquire this lock. + */ + private final BackupLock backupLock = new BackupLock(); + + public BackupLock getBackupLock() { + return backupLock; + } + LRUStatistics getOrCreatePRLRUStats(PlaceHolderDiskRegion dr) { String prName = dr.getPrName(); LRUStatistics result = null; @@ -4046,14 +4056,14 @@ public class DiskStoreImpl implements DiskStore { // level operations, we will need to be careful // to block them *before* they are put in the async // queue - getDiskInitFile().lockForBackup(); + getBackupLock().lockForBackup(); } /** * Release the lock that is preventing operations on this disk store during the backup process. */ public void releaseBackupLock() { - getDiskInitFile().unlockForBackup(); + getBackupLock().unlockForBackup(); } /** @@ -4063,7 +4073,7 @@ public class DiskStoreImpl implements DiskStore { */ public void startBackup(File targetDir, BackupInspector baselineInspector, RestoreScript restoreScript) throws IOException { - getDiskInitFile().setBackupThread(Thread.currentThread()); + getBackupLock().setBackupThread(); boolean done = false; try { for (;;) { http://git-wip-us.apache.org/repos/asf/geode/blob/f1326be5/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java index 5399d5a..80f19b5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java @@ -688,6 +688,8 @@ public class Oplog implements CompactableOplog, Flushable { public void replaceIncompatibleEntry(DiskRegionView dr, DiskEntry old, DiskEntry repl) { boolean useNextOplog = false; + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock) { if (getOplogSet().getChild() != this) { // make sure to only call replaceIncompatibleEntry for child, because @@ -1159,6 +1161,8 @@ public class Oplog implements CompactableOplog, Flushable { */ File getOplogFile() throws SyncFailedException, IOException { // @todo check callers for drf + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock/* crf */) { if (!this.crf.RAFClosed) { this.crf.raf.getFD().sync(); @@ -3296,6 +3300,8 @@ public class Oplog implements CompactableOplog, Flushable { private void basicClose(boolean forceDelete) { flushAll(); + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock/* crf */) { unpreblow(this.crf, getMaxCrfSize()); if (!this.crf.RAFClosed) { @@ -3312,6 +3318,8 @@ public class Oplog implements CompactableOplog, Flushable { } this.closed = true; } + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock/* drf */) { unpreblow(this.drf, getMaxDrfSize()); if (!this.drf.RAFClosed) { @@ -3560,118 +3568,123 @@ public class Oplog implements CompactableOplog, Flushable { System.out.println("basicCreate KRF_DEBUG"); Thread.sleep(1000); } - synchronized (this.lock) { // TODO soplog perf analysis shows this as a - // contention point - // synchronized (this.crf) { - initOpState(OPLOG_NEW_ENTRY_0ID, dr, entry, value, userBits, false); - // Check if the current data in ByteBuffer will cause a - // potential increase in the size greater than the max allowed - long temp = (getOpStateSize() + this.crf.currSize); - if (!this.wroteNewEntryBase) { - temp += OPLOG_NEW_ENTRY_BASE_REC_SIZE; - } - if (this != getOplogSet().getChild()) { - useNextOplog = true; - } else if (temp > getMaxCrfSize() && !isFirstRecord()) { - switchOpLog(dr, getOpStateSize(), entry); - useNextOplog = true; - } else { - if (this.lockedForKRFcreate) { - CacheClosedException cce = new CacheClosedException("The disk store is closed."); - dr.getCancelCriterion().checkCancelInProgress(cce); - throw cce; - } - this.firstRecord = false; - writeNewEntryBaseRecord(async); - // Now we can finally call newOplogEntryId. - // We need to make sure the create records - // are written in the same order as they are created. - // This allows us to not encode the oplogEntryId explicitly in the - // record - long createOplogEntryId = getOplogSet().newOplogEntryId(); - id.setKeyId(createOplogEntryId); - - // startPosForSynchOp = this.crf.currSize; - // Allow it to be added to the OpLOg so increase the - // size of currenstartPosForSynchOpt oplog - int dataLength = getOpStateSize(); - // It is necessary that we set the - // Oplog ID here without releasing the lock on object as we are - // writing to the file after releasing the lock. This can cause - // a situation where the - // switching thread has added Oplog for compaction while the previous - // thread has still not started writing. Thus compactor can - // miss an entry as the oplog Id was not set till then. - // This is because a compactor thread will iterate over the entries & - // use only those which have OplogID equal to that of Oplog being - // compacted without taking any lock. A lock is taken only if the - // entry is a potential candidate. - // Further the compactor may delete the file as a compactor thread does - // not require to take any shared/exclusive lock at DiskStoreImpl - // or Oplog level. - // It is also assumed that compactor thread will take a lock on both - // entry as well as DiskID while compacting. In case of synch - // mode we can - // safely set OplogID without taking lock on DiskId. But - // for asynch mode - // we have to take additional precaution as the asynch - // writer of previous - // oplog can interfere with the current oplog. - id.setOplogId(getOplogId()); - // do the io while holding lock so that switch can set doneAppending - // Write the data to the opLog for the synch mode - startPosForSynchOp = writeOpLogBytes(this.crf, async, true); - // if (this.crf.currSize != startPosForSynchOp) { - // assert false; - // } - this.crf.currSize = temp; - if (EntryBits.isNeedsValue(userBits)) { - id.setValueLength(value.getLength()); + getParent().getBackupLock().lock(); + try { + synchronized (this.lock) { // TODO soplog perf analysis shows this as a + // contention point + // synchronized (this.crf) { + initOpState(OPLOG_NEW_ENTRY_0ID, dr, entry, value, userBits, false); + // Check if the current data in ByteBuffer will cause a + // potential increase in the size greater than the max allowed + long temp = (getOpStateSize() + this.crf.currSize); + if (!this.wroteNewEntryBase) { + temp += OPLOG_NEW_ENTRY_BASE_REC_SIZE; + } + if (this != getOplogSet().getChild()) { + useNextOplog = true; + } else if (temp > getMaxCrfSize() && !isFirstRecord()) { + switchOpLog(dr, getOpStateSize(), entry); + useNextOplog = true; } else { - id.setValueLength(0); - } - id.setUserBits(userBits); + if (this.lockedForKRFcreate) { + CacheClosedException cce = new CacheClosedException("The disk store is closed."); + dr.getCancelCriterion().checkCancelInProgress(cce); + throw cce; + } + this.firstRecord = false; + writeNewEntryBaseRecord(async); + // Now we can finally call newOplogEntryId. + // We need to make sure the create records + // are written in the same order as they are created. + // This allows us to not encode the oplogEntryId explicitly in the + // record + long createOplogEntryId = getOplogSet().newOplogEntryId(); + id.setKeyId(createOplogEntryId); + + // startPosForSynchOp = this.crf.currSize; + // Allow it to be added to the OpLOg so increase the + // size of currenstartPosForSynchOpt oplog + int dataLength = getOpStateSize(); + // It is necessary that we set the + // Oplog ID here without releasing the lock on object as we are + // writing to the file after releasing the lock. This can cause + // a situation where the + // switching thread has added Oplog for compaction while the previous + // thread has still not started writing. Thus compactor can + // miss an entry as the oplog Id was not set till then. + // This is because a compactor thread will iterate over the entries & + // use only those which have OplogID equal to that of Oplog being + // compacted without taking any lock. A lock is taken only if the + // entry is a potential candidate. + // Further the compactor may delete the file as a compactor thread does + // not require to take any shared/exclusive lock at DiskStoreImpl + // or Oplog level. + // It is also assumed that compactor thread will take a lock on both + // entry as well as DiskID while compacting. In case of synch + // mode we can + // safely set OplogID without taking lock on DiskId. But + // for asynch mode + // we have to take additional precaution as the asynch + // writer of previous + // oplog can interfere with the current oplog. + id.setOplogId(getOplogId()); + // do the io while holding lock so that switch can set doneAppending + // Write the data to the opLog for the synch mode + startPosForSynchOp = writeOpLogBytes(this.crf, async, true); + // if (this.crf.currSize != startPosForSynchOp) { + // assert false; + // } + this.crf.currSize = temp; + if (EntryBits.isNeedsValue(userBits)) { + id.setValueLength(value.getLength()); + } else { + id.setValueLength(0); + } + id.setUserBits(userBits); - if (logger.isTraceEnabled()) { - logger.trace("Oplog::basicCreate:Release dByteBuffer with data for Disk ID = {}", id); - } - // As such for any put or get operation , a synch is taken - // on the Entry object in the DiskEntry's Helper functions. - // Compactor thread will also take a lock on entry object. Therefore - // we do not require a lock on DiskID, as concurrent access for - // value will not occur. - startPosForSynchOp += getOpStateValueOffset(); - if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES)) { - VersionTag tag = null; - if (entry.getVersionStamp() != null) { - tag = entry.getVersionStamp().asVersionTag(); + if (logger.isTraceEnabled()) { + logger.trace("Oplog::basicCreate:Release dByteBuffer with data for Disk ID = {}", id); } - logger.trace(LogMarker.PERSIST_WRITES, - "basicCreate: id=<{}> key=<{}> valueOffset={} userBits={} valueLen={} valueBytes={} drId={} versionTag={} oplog#{}", - abs(id.getKeyId()), entry.getKey(), startPosForSynchOp, userBits, - (value != null ? value.getLength() : 0), value.getBytesAsString(), dr.getId(), tag, - getOplogId()); - } - id.setOffsetInOplog(startPosForSynchOp); - addLive(dr, entry); - // Size of the current oplog being increased - // due to 'create' operation. Set the change in stats. - this.dirHolder.incrementTotalOplogSize(dataLength); - incTotalCount(); - - // Update the region version vector for the disk store. - // This needs to be done under lock so that we don't switch oplogs - // unit the version vector accurately represents what is in this oplog - RegionVersionVector rvv = dr.getRegionVersionVector(); - if (rvv != null && entry.getVersionStamp() != null) { - rvv.recordVersion(entry.getVersionStamp().getMemberID(), - entry.getVersionStamp().getRegionVersion()); - } - - EntryLogger.logPersistPut(dr.getName(), entry.getKey(), dr.getDiskStoreID()); + // As such for any put or get operation , a synch is taken + // on the Entry object in the DiskEntry's Helper functions. + // Compactor thread will also take a lock on entry object. Therefore + // we do not require a lock on DiskID, as concurrent access for + // value will not occur. + startPosForSynchOp += getOpStateValueOffset(); + if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES)) { + VersionTag tag = null; + if (entry.getVersionStamp() != null) { + tag = entry.getVersionStamp().asVersionTag(); + } + logger.trace(LogMarker.PERSIST_WRITES, + "basicCreate: id=<{}> key=<{}> valueOffset={} userBits={} valueLen={} valueBytes={} drId={} versionTag={} oplog#{}", + abs(id.getKeyId()), entry.getKey(), startPosForSynchOp, userBits, + (value != null ? value.getLength() : 0), value.getBytesAsString(), dr.getId(), tag, + getOplogId()); + } + id.setOffsetInOplog(startPosForSynchOp); + addLive(dr, entry); + // Size of the current oplog being increased + // due to 'create' operation. Set the change in stats. + this.dirHolder.incrementTotalOplogSize(dataLength); + incTotalCount(); + + // Update the region version vector for the disk store. + // This needs to be done under lock so that we don't switch oplogs + // unit the version vector accurately represents what is in this oplog + RegionVersionVector rvv = dr.getRegionVersionVector(); + if (rvv != null && entry.getVersionStamp() != null) { + rvv.recordVersion(entry.getVersionStamp().getMemberID(), + entry.getVersionStamp().getRegionVersion()); + } + + EntryLogger.logPersistPut(dr.getName(), entry.getKey(), dr.getDiskStoreID()); + } + clearOpState(); + // } } - clearOpState(); - // } + } finally { + getParent().getBackupLock().unlock(); } if (useNextOplog) { if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { @@ -3700,10 +3713,15 @@ public class Oplog implements CompactableOplog, Flushable { */ void forceRolling(DiskRegion dr) { if (getOplogSet().getChild() == this) { - synchronized (this.lock) { - if (getOplogSet().getChild() == this) { - switchOpLog(dr, 0, null); + getParent().getBackupLock().lock(); + try { + synchronized (this.lock) { + if (getOplogSet().getChild() == this) { + switchOpLog(dr, 0, null); + } } + } finally { + getParent().getBackupLock().unlock(); } if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { CacheObserverHolder.getInstance().afterSwitchingOplog(); @@ -4129,6 +4147,8 @@ public class Oplog implements CompactableOplog, Flushable { lockCompactor(); try { + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock) { // 42733: after set it to true, we will not reset it, since this oplog // will be @@ -4579,96 +4599,101 @@ public class Oplog implements CompactableOplog, Flushable { System.out.println("basicModify KRF_DEBUG"); Thread.sleep(1000); } - synchronized (this.lock) { - // synchronized (this.crf) { - if (getOplogSet().getChild() != this) { - useNextOplog = true; - } else { - initOpState(OPLOG_MOD_ENTRY_1ID, dr, entry, value, userBits, false); - adjustment = getOpStateSize(); - assert adjustment > 0; - long temp = (this.crf.currSize + adjustment); - if (temp > getMaxCrfSize() && !isFirstRecord()) { - switchOpLog(dr, adjustment, entry); - // we can't reuse it since it contains variable length data + getParent().getBackupLock().lock(); + try { + synchronized (this.lock) { + // synchronized (this.crf) { + if (getOplogSet().getChild() != this) { useNextOplog = true; } else { - if (this.lockedForKRFcreate) { - CacheClosedException cce = new CacheClosedException("The disk store is closed."); - dr.getCancelCriterion().checkCancelInProgress(cce); - throw cce; - } - this.firstRecord = false; - long oldOplogId; - // do the io while holding lock so that switch can set doneAppending - // Write the data to the opLog for the synch mode - startPosForSynchOp = writeOpLogBytes(this.crf, async, true); - this.crf.currSize = temp; - startPosForSynchOp += getOpStateValueOffset(); - if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES)) { - VersionTag tag = null; - if (entry.getVersionStamp() != null) { - tag = entry.getVersionStamp().asVersionTag(); - } - logger.trace(LogMarker.PERSIST_WRITES, - "basicModify: id=<{}> key=<{}> valueOffset={} userBits={} valueLen={} valueBytes=<{}> drId={} versionStamp={} oplog#{}", - abs(id.getKeyId()), entry.getKey(), startPosForSynchOp, userBits, value.getLength(), - value.getBytesAsString(), dr.getId(), tag, getOplogId()); - } - if (EntryBits.isNeedsValue(userBits)) { - id.setValueLength(value.getLength()); + initOpState(OPLOG_MOD_ENTRY_1ID, dr, entry, value, userBits, false); + adjustment = getOpStateSize(); + assert adjustment > 0; + long temp = (this.crf.currSize + adjustment); + if (temp > getMaxCrfSize() && !isFirstRecord()) { + switchOpLog(dr, adjustment, entry); + // we can't reuse it since it contains variable length data + useNextOplog = true; } else { - id.setValueLength(0); - } - id.setUserBits(userBits); - if (logger.isTraceEnabled()) { - logger.trace("Oplog::basicModify:Released ByteBuffer with data for Disk ID = {}", id); - } - synchronized (id) { - // Need to do this while synced on id - // now that we compact forward to most recent oplog. - // @todo darrel: The sync logic in the disk code is so complex - // a really doubt is is correct. - // I think we need to do a fresh rewrite of it. - oldOplogId = id.setOplogId(getOplogId()); - if (EntryBits.isAnyInvalid(userBits) || EntryBits.isTombstone(userBits)) { - id.setOffsetInOplog(-1); + if (this.lockedForKRFcreate) { + CacheClosedException cce = new CacheClosedException("The disk store is closed."); + dr.getCancelCriterion().checkCancelInProgress(cce); + throw cce; + } + this.firstRecord = false; + long oldOplogId; + // do the io while holding lock so that switch can set doneAppending + // Write the data to the opLog for the synch mode + startPosForSynchOp = writeOpLogBytes(this.crf, async, true); + this.crf.currSize = temp; + startPosForSynchOp += getOpStateValueOffset(); + if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES)) { + VersionTag tag = null; + if (entry.getVersionStamp() != null) { + tag = entry.getVersionStamp().asVersionTag(); + } + logger.trace(LogMarker.PERSIST_WRITES, + "basicModify: id=<{}> key=<{}> valueOffset={} userBits={} valueLen={} valueBytes=<{}> drId={} versionStamp={} oplog#{}", + abs(id.getKeyId()), entry.getKey(), startPosForSynchOp, userBits, + value.getLength(), value.getBytesAsString(), dr.getId(), tag, getOplogId()); + } + if (EntryBits.isNeedsValue(userBits)) { + id.setValueLength(value.getLength()); } else { - id.setOffsetInOplog(startPosForSynchOp); + id.setValueLength(0); } - } - // Set the oplog size change for stats - this.dirHolder.incrementTotalOplogSize(adjustment); - this.incTotalCount(); - - EntryLogger.logPersistPut(dr.getName(), entry.getKey(), dr.getDiskStoreID()); - if (oldOplogId != getOplogId()) { - Oplog oldOplog = getOplogSet().getChild(oldOplogId); - if (oldOplog != null) { - oldOplog.rmLive(dr, entry); - emptyOplog = oldOplog; + id.setUserBits(userBits); + if (logger.isTraceEnabled()) { + logger.trace("Oplog::basicModify:Released ByteBuffer with data for Disk ID = {}", id); + } + synchronized (id) { + // Need to do this while synced on id + // now that we compact forward to most recent oplog. + // @todo darrel: The sync logic in the disk code is so complex + // a really doubt is is correct. + // I think we need to do a fresh rewrite of it. + oldOplogId = id.setOplogId(getOplogId()); + if (EntryBits.isAnyInvalid(userBits) || EntryBits.isTombstone(userBits)) { + id.setOffsetInOplog(-1); + } else { + id.setOffsetInOplog(startPosForSynchOp); + } + } + // Set the oplog size change for stats + this.dirHolder.incrementTotalOplogSize(adjustment); + this.incTotalCount(); + + EntryLogger.logPersistPut(dr.getName(), entry.getKey(), dr.getDiskStoreID()); + if (oldOplogId != getOplogId()) { + Oplog oldOplog = getOplogSet().getChild(oldOplogId); + if (oldOplog != null) { + oldOplog.rmLive(dr, entry); + emptyOplog = oldOplog; + } + addLive(dr, entry); + // Note if this mod was done to oldOplog then this entry is already + // in + // the linked list. All we needed to do in this case is call + // incTotalCount + } else { + getOrCreateDRI(dr).update(entry); } - addLive(dr, entry); - // Note if this mod was done to oldOplog then this entry is already - // in - // the linked list. All we needed to do in this case is call - // incTotalCount - } else { - getOrCreateDRI(dr).update(entry); - } - // Update the region version vector for the disk store. - // This needs to be done under lock so that we don't switch oplogs - // unit the version vector accurately represents what is in this oplog - RegionVersionVector rvv = dr.getRegionVersionVector(); - if (rvv != null && entry.getVersionStamp() != null) { - rvv.recordVersion(entry.getVersionStamp().getMemberID(), - entry.getVersionStamp().getRegionVersion()); + // Update the region version vector for the disk store. + // This needs to be done under lock so that we don't switch oplogs + // unit the version vector accurately represents what is in this oplog + RegionVersionVector rvv = dr.getRegionVersionVector(); + if (rvv != null && entry.getVersionStamp() != null) { + rvv.recordVersion(entry.getVersionStamp().getMemberID(), + entry.getVersionStamp().getRegionVersion()); + } } + clearOpState(); } - clearOpState(); + // } } - // } + } finally { + getParent().getBackupLock().unlock(); } if (useNextOplog) { if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { @@ -4702,43 +4727,48 @@ public class Oplog implements CompactableOplog, Flushable { throws IOException, InterruptedException { boolean useNextOplog = false; int adjustment = 0; - synchronized (this.lock) { - if (getOplogSet().getChild() != this) { - useNextOplog = true; - } else { - this.opState.initialize(OPLOG_CONFLICT_VERSION, dr.getId(), tag); - adjustment = getOpStateSize(); - assert adjustment > 0; - long temp = (this.crf.currSize + adjustment); - if (temp > getMaxCrfSize() && !isFirstRecord()) { - switchOpLog(dr, adjustment, null); - // we can't reuse it since it contains variable length data + getParent().getBackupLock().lock(); + try { + synchronized (this.lock) { + if (getOplogSet().getChild() != this) { useNextOplog = true; } else { - if (this.lockedForKRFcreate) { - CacheClosedException cce = new CacheClosedException("The disk store is closed."); - dr.getCancelCriterion().checkCancelInProgress(cce); - throw cce; - } - this.firstRecord = false; - writeOpLogBytes(this.crf, async, true); - this.crf.currSize = temp; - if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES)) { - logger.trace(LogMarker.PERSIST_WRITES, - "basicSaveConflictVersionTag: drId={} versionStamp={} oplog#{}", dr.getId(), tag, - getOplogId()); - } - this.dirHolder.incrementTotalOplogSize(adjustment); - // Update the region version vector for the disk store. - // This needs to be done under lock so that we don't switch oplogs - // unit the version vector accurately represents what is in this oplog - RegionVersionVector rvv = dr.getRegionVersionVector(); - if (rvv != null && dr.getFlags().contains(DiskRegionFlag.IS_WITH_VERSIONING)) { - rvv.recordVersion(tag.getMemberID(), tag.getRegionVersion()); + this.opState.initialize(OPLOG_CONFLICT_VERSION, dr.getId(), tag); + adjustment = getOpStateSize(); + assert adjustment > 0; + long temp = (this.crf.currSize + adjustment); + if (temp > getMaxCrfSize() && !isFirstRecord()) { + switchOpLog(dr, adjustment, null); + // we can't reuse it since it contains variable length data + useNextOplog = true; + } else { + if (this.lockedForKRFcreate) { + CacheClosedException cce = new CacheClosedException("The disk store is closed."); + dr.getCancelCriterion().checkCancelInProgress(cce); + throw cce; + } + this.firstRecord = false; + writeOpLogBytes(this.crf, async, true); + this.crf.currSize = temp; + if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES)) { + logger.trace(LogMarker.PERSIST_WRITES, + "basicSaveConflictVersionTag: drId={} versionStamp={} oplog#{}", dr.getId(), tag, + getOplogId()); + } + this.dirHolder.incrementTotalOplogSize(adjustment); + // Update the region version vector for the disk store. + // This needs to be done under lock so that we don't switch oplogs + // unit the version vector accurately represents what is in this oplog + RegionVersionVector rvv = dr.getRegionVersionVector(); + if (rvv != null && dr.getFlags().contains(DiskRegionFlag.IS_WITH_VERSIONING)) { + rvv.recordVersion(tag.getMemberID(), tag.getRegionVersion()); + } } + clearOpState(); } - clearOpState(); } + } finally { + getParent().getBackupLock().unlock(); } if (useNextOplog) { if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { @@ -4755,6 +4785,7 @@ public class Oplog implements CompactableOplog, Flushable { boolean useNextOplog = false; long startPosForSynchOp = -1L; int adjustment = 0; + // No need to get the backup lock since this is only for offline compaction synchronized (this.lock) { // synchronized (this.crf) { if (getOplogSet().getChild() != this) { @@ -4907,19 +4938,24 @@ public class Oplog implements CompactableOplog, Flushable { */ public void writeGCRVV(DiskRegion dr) { boolean useNextOplog = false; - synchronized (this.lock) { - if (getOplogSet().getChild() != this) { - useNextOplog = true; - } else { - try { - writeRVVRecord(this.drf, - Collections.<Long, AbstractDiskRegion>singletonMap(dr.getId(), dr), true); - } catch (IOException ex) { - dr.getCancelCriterion().checkCancelInProgress(ex); - throw new DiskAccessException(LocalizedStrings.Oplog_FAILED_RECORDING_RVV_BECAUSE_OF_0 - .toLocalizedString(this.diskFile.getPath()), ex, dr.getName()); + getParent().getBackupLock().lock(); + try { + synchronized (this.lock) { + if (getOplogSet().getChild() != this) { + useNextOplog = true; + } else { + try { + writeRVVRecord(this.drf, + Collections.<Long, AbstractDiskRegion>singletonMap(dr.getId(), dr), true); + } catch (IOException ex) { + dr.getCancelCriterion().checkCancelInProgress(ex); + throw new DiskAccessException(LocalizedStrings.Oplog_FAILED_RECORDING_RVV_BECAUSE_OF_0 + .toLocalizedString(this.diskFile.getPath()), ex, dr.getName()); + } } } + } finally { + getParent().getBackupLock().unlock(); } if (useNextOplog) { getOplogSet().getChild().writeGCRVV(dr); @@ -4936,36 +4972,41 @@ public class Oplog implements CompactableOplog, Flushable { */ public void writeRVV(DiskRegion dr, RegionVersionVector sourceRVV, Boolean isRVVTrusted) { boolean useNextOplog = false; - synchronized (this.lock) { - if (getOplogSet().getChild() != this) { - useNextOplog = true; - } else { + getParent().getBackupLock().lock(); + try { + synchronized (this.lock) { + if (getOplogSet().getChild() != this) { + useNextOplog = true; + } else { - try { - // We'll update the RVV of the disk region while holding the lock on - // the oplog, - // to make sure we don't switch oplogs while we're in the middle of - // this. - if (sourceRVV != null) { - dr.getRegionVersionVector().recordVersions(sourceRVV); - } else { - // it's original EndGII, not to write duplicate rvv if its trusted - if (dr.getRVVTrusted()) { - return; + try { + // We'll update the RVV of the disk region while holding the lock on + // the oplog, + // to make sure we don't switch oplogs while we're in the middle of + // this. + if (sourceRVV != null) { + dr.getRegionVersionVector().recordVersions(sourceRVV); + } else { + // it's original EndGII, not to write duplicate rvv if its trusted + if (dr.getRVVTrusted()) { + return; + } } + if (isRVVTrusted != null) { + // isRVVTrusted == null means "as is" + dr.setRVVTrusted(isRVVTrusted); + } + writeRVVRecord(this.crf, + Collections.<Long, AbstractDiskRegion>singletonMap(dr.getId(), dr), false); + } catch (IOException ex) { + dr.getCancelCriterion().checkCancelInProgress(ex); + throw new DiskAccessException(LocalizedStrings.Oplog_FAILED_RECORDING_RVV_BECAUSE_OF_0 + .toLocalizedString(this.diskFile.getPath()), ex, dr.getName()); } - if (isRVVTrusted != null) { - // isRVVTrusted == null means "as is" - dr.setRVVTrusted(isRVVTrusted); - } - writeRVVRecord(this.crf, - Collections.<Long, AbstractDiskRegion>singletonMap(dr.getId(), dr), false); - } catch (IOException ex) { - dr.getCancelCriterion().checkCancelInProgress(ex); - throw new DiskAccessException(LocalizedStrings.Oplog_FAILED_RECORDING_RVV_BECAUSE_OF_0 - .toLocalizedString(this.diskFile.getPath()), ex, dr.getName()); } } + } finally { + getParent().getBackupLock().unlock(); } if (useNextOplog) { getOplogSet().getChild().writeRVV(dr, sourceRVV, isRVVTrusted); @@ -5009,74 +5050,79 @@ public class Oplog implements CompactableOplog, Flushable { System.out.println("basicRemove KRF_DEBUG"); Thread.sleep(1000); } - synchronized (this.lock) { - if (getOplogSet().getChild() != this) { - useNextOplog = true; - } else if ((this.drf.currSize + MAX_DELETE_ENTRY_RECORD_BYTES) > getMaxDrfSize() - && !isFirstRecord()) { - switchOpLog(dr, MAX_DELETE_ENTRY_RECORD_BYTES, entry); - useNextOplog = true; - } else { - if (this.lockedForKRFcreate) { - CacheClosedException cce = new CacheClosedException("The disk store is closed."); - dr.getCancelCriterion().checkCancelInProgress(cce); - throw cce; - } - long oldOplogId = id.setOplogId(getOplogId()); - if (!isClear) { - this.firstRecord = false; - // Ok now we can go ahead and find out its actual size - // This is the only place to set notToUseUserBits=true - initOpState(OPLOG_DEL_ENTRY_1ID, dr, entry, null, (byte) 0, true); - int adjustment = getOpStateSize(); - - this.drf.currSize += adjustment; - // do the io while holding lock so that switch can set doneAppending - if (logger.isTraceEnabled()) { - logger.trace( - "Oplog::basicRemove: Recording the Deletion of entry in the Oplog with id = {} The Oplog Disk ID for the entry being deleted = {} Mode is Synch", - getOplogId(), id); + getParent().getBackupLock().lock(); + try { + synchronized (this.lock) { + if (getOplogSet().getChild() != this) { + useNextOplog = true; + } else if ((this.drf.currSize + MAX_DELETE_ENTRY_RECORD_BYTES) > getMaxDrfSize() + && !isFirstRecord()) { + switchOpLog(dr, MAX_DELETE_ENTRY_RECORD_BYTES, entry); + useNextOplog = true; + } else { + if (this.lockedForKRFcreate) { + CacheClosedException cce = new CacheClosedException("The disk store is closed."); + dr.getCancelCriterion().checkCancelInProgress(cce); + throw cce; } + long oldOplogId = id.setOplogId(getOplogId()); + if (!isClear) { + this.firstRecord = false; + // Ok now we can go ahead and find out its actual size + // This is the only place to set notToUseUserBits=true + initOpState(OPLOG_DEL_ENTRY_1ID, dr, entry, null, (byte) 0, true); + int adjustment = getOpStateSize(); + + this.drf.currSize += adjustment; + // do the io while holding lock so that switch can set doneAppending + if (logger.isTraceEnabled()) { + logger.trace( + "Oplog::basicRemove: Recording the Deletion of entry in the Oplog with id = {} The Oplog Disk ID for the entry being deleted = {} Mode is Synch", + getOplogId(), id); + } - // Write the data to the opLog for the synch mode - // TODO: if we don't sync write destroys what will happen if - // we do 1. create k1 2. destroy k1 3. create k1? - // It would be possible for the crf to be flushed but not the drf. - // Then during recovery we will find identical keys with different - // entryIds. - // I think we can safely have drf writes be async as long as we flush - // the drf - // before we flush the crf. - // However we can't have removes by async if we are doing a sync write - // because we might be killed right after we do this write. - startPosForSynchOp = writeOpLogBytes(this.drf, async, true); - setHasDeletes(true); - if (logger.isDebugEnabled(LogMarker.PERSIST_WRITES)) { - logger.debug("basicRemove: id=<{}> key=<{}> drId={} oplog#{}", abs(id.getKeyId()), - entry.getKey(), dr.getId(), getOplogId()); - } + // Write the data to the opLog for the synch mode + // TODO: if we don't sync write destroys what will happen if + // we do 1. create k1 2. destroy k1 3. create k1? + // It would be possible for the crf to be flushed but not the drf. + // Then during recovery we will find identical keys with different + // entryIds. + // I think we can safely have drf writes be async as long as we flush + // the drf + // before we flush the crf. + // However we can't have removes by async if we are doing a sync write + // because we might be killed right after we do this write. + startPosForSynchOp = writeOpLogBytes(this.drf, async, true); + setHasDeletes(true); + if (logger.isDebugEnabled(LogMarker.PERSIST_WRITES)) { + logger.debug("basicRemove: id=<{}> key=<{}> drId={} oplog#{}", abs(id.getKeyId()), + entry.getKey(), dr.getId(), getOplogId()); + } - if (logger.isTraceEnabled()) { - logger.trace("Oplog::basicRemove:Released ByteBuffer for Disk ID = {}", id); + if (logger.isTraceEnabled()) { + logger.trace("Oplog::basicRemove:Released ByteBuffer for Disk ID = {}", id); + } + this.dirHolder.incrementTotalOplogSize(adjustment); } - this.dirHolder.incrementTotalOplogSize(adjustment); - } - // Set the oplog size change for stats - id.setOffsetInOplog(-1); + // Set the oplog size change for stats + id.setOffsetInOplog(-1); - EntryLogger.logPersistDestroy(dr.getName(), entry.getKey(), dr.getDiskStoreID()); - Oplog rmOplog = null; - if (oldOplogId == getOplogId()) { - rmOplog = this; - } else { - rmOplog = getOplogSet().getChild(oldOplogId); - } - if (rmOplog != null) { - rmOplog.rmLive(dr, entry); - emptyOplog = rmOplog; + EntryLogger.logPersistDestroy(dr.getName(), entry.getKey(), dr.getDiskStoreID()); + Oplog rmOplog = null; + if (oldOplogId == getOplogId()) { + rmOplog = this; + } else { + rmOplog = getOplogSet().getChild(oldOplogId); + } + if (rmOplog != null) { + rmOplog.rmLive(dr, entry); + emptyOplog = rmOplog; + } + clearOpState(); } - clearOpState(); } + } finally { + getParent().getBackupLock().unlock(); } if (useNextOplog) { if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { @@ -5130,6 +5176,8 @@ public class Oplog implements CompactableOplog, Flushable { private void flush(OplogFile olf, boolean doSync) throws IOException { try { + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock/* olf */) { if (olf.RAFClosed) { return; @@ -5187,6 +5235,8 @@ public class Oplog implements CompactableOplog, Flushable { private void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws IOException { try { + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock/* olf */) { if (olf.RAFClosed) { return; @@ -5255,34 +5305,40 @@ public class Oplog implements CompactableOplog, Flushable { private long writeOpLogBytes(OplogFile olf, boolean async, boolean doFlushIfSync) throws IOException { long startPos = -1L; - synchronized (this.lock/* olf */) { - Assert.assertTrue(!this.doneAppending); - if (this.closed) { - Assert.assertTrue(false, "The Oplog " + this.oplogId + " for store " + getParent().getName() - + " has been closed for synch mode while writing is going on. This should not happen"); - } - // It is assumed that the file pointer is already at the - // appropriate position in the file so as to allow writing at the end. - // Any fault in operations will set the pointer back to the write - // location. - // Also it is only in case of synch writing, we are writing more - // than what is actually needed, we will have to reset the pointer. - // Also need to add in offset in writeBuf in case we are not flushing - // writeBuf - startPos = olf.channel.position() + olf.writeBuf.position(); - // Assert.assertTrue(startPos > lastWritePos, - // "startPos=" + startPos + - // " was not > lastWritePos=" + lastWritePos); - long bytesWritten = this.opState.write(olf); - if (!async && doFlushIfSync) { - flushAndSync(olf); - } - getStats().incWrittenBytes(bytesWritten, async); - - // // Moved the set of lastWritePos to after write - // // so if write throws an exception it will not be updated. - // // This fixes bug 40449. - // this.lastWritePos = startPos; + getParent().getBackupLock().lock(); + try { + synchronized (this.lock/* olf */) { + Assert.assertTrue(!this.doneAppending); + if (this.closed) { + Assert.assertTrue(false, "The Oplog " + this.oplogId + " for store " + + getParent().getName() + + " has been closed for synch mode while writing is going on. This should not happen"); + } + // It is assumed that the file pointer is already at the + // appropriate position in the file so as to allow writing at the end. + // Any fault in operations will set the pointer back to the write + // location. + // Also it is only in case of synch writing, we are writing more + // than what is actually needed, we will have to reset the pointer. + // Also need to add in offset in writeBuf in case we are not flushing + // writeBuf + startPos = olf.channel.position() + olf.writeBuf.position(); + // Assert.assertTrue(startPos > lastWritePos, + // "startPos=" + startPos + + // " was not > lastWritePos=" + lastWritePos); + long bytesWritten = this.opState.write(olf); + if (!async && doFlushIfSync) { + flushAndSync(olf); + } + getStats().incWrittenBytes(bytesWritten, async); + + // // Moved the set of lastWritePos to after write + // // so if write throws an exception it will not be updated. + // // This fixes bug 40449. + // this.lastWritePos = startPos; + } + } finally { + getParent().getBackupLock().unlock(); } return startPos; } @@ -5296,6 +5352,8 @@ public class Oplog implements CompactableOplog, Flushable { boolean closeRAF() { if (this.beingRead) return false; + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock/* crf */) { if (this.beingRead) return false; @@ -5325,6 +5383,8 @@ public class Oplog implements CompactableOplog, Flushable { * @return true if oplog file is open and can be read from; false if not */ private boolean reopenFileIfClosed() throws IOException { + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock/* crf */) { boolean result = !this.crf.RAFClosed; if (!result && this.okToReopen) { @@ -5343,6 +5403,8 @@ public class Oplog implements CompactableOplog, Flushable { boolean didReopen = false; boolean accessedInactive = false; try { + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock/* crf */) { // if (this.closed || this.deleted.get()) { // throw new DiskAccessException("attempting get on " @@ -5523,6 +5585,8 @@ public class Oplog implements CompactableOplog, Flushable { } } else { try { + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock/* crf */) { if (/* * !getParent().isSync() since compactor groups writes && @@ -5708,6 +5772,8 @@ public class Oplog implements CompactableOplog, Flushable { } private void deleteFile(final OplogFile olf) { + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock) { if (olf.currSize != 0) { this.dirHolder.decrementTotalOplogSize(olf.currSize); @@ -5799,6 +5865,8 @@ public class Oplog implements CompactableOplog, Flushable { } private void finishedAppending() { + // No need to get the backup lock prior to synchronizing (correct lock order) since the + // synchronized block does not attempt to get the backup lock (incorrect lock order) synchronized (this.lock/* crf */) { this.doneAppending = true; }