This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 143ffb48ea GEODE-9854: Orphaned .drf file causing memory leak (#7234)
143ffb48ea is described below

commit 143ffb48ea2a28b19e4d184a68cb6484c9e0feb8
Author: Jakov Varenina <62134331+jvaren...@users.noreply.github.com>
AuthorDate: Mon Apr 4 08:51:38 2022 +0200

    GEODE-9854: Orphaned .drf file causing memory leak (#7234)
    
    * GEODE-9854: Orphaned .drf file causing memory leak
    
    Issue:
    An OpLog files are compacted, but the .drf file is left because it contains 
deletes of
    entries in previous .crfs. The .crf file is deleted, but the orphaned .drf 
is not until all
    previous .crf files (.crfs with smaller id) are deleted.
    
    The problem is that compacted Oplog object representing orphaned .drf file 
holds
    a structure in memory (Oplog.regionMap) that contains information that is 
not useful
    after the compaction and it takes certain amount of memory. Besides, there 
is a special
    case when creating .krf files that, depending on the execution order,
    could make the problem more severe (it could leave pendingKrfTags structure
    on the regionMap and this could take up a significant amount of memory). 
This
    pendingKrfTags HashMap is actually empty, but consumes memory because it 
was used
    previously and the size of the HashMap was not reduced after it is cleared.
    This special case usually happens when new Oplog is rolled out and previous 
Oplog
    is immediately marked as eligible for compaction. Compaction and .krf 
creation start at
    the similar time and compactor cancels creation of .krf file.
    The pendingKrfTags structure is usually cleared when .krf file is created, 
but since
    compaction canceled creation of .krf, the pendingKrfTags structure remain 
in memory
    until Oplog representing orphaned .drf file is deleted.
    
    Solution:
    Clear the regionMap data structure of the Oplog when it is compacted 
(currently it is
    deleted when the Oplog is destroyed).
    
    Co-authored-by: Alberto Gomez <alberto.go...@est.tech>
---
 ...ObjectThatAreNoLongerNeededIntegrationTest.java | 449 +++++++++++++++++++++
 .../org/apache/geode/internal/cache/Oplog.java     |  69 +++-
 2 files changed, 498 insertions(+), 20 deletions(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskRegionCompactorClearsObjectThatAreNoLongerNeededIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskRegionCompactorClearsObjectThatAreNoLongerNeededIntegrationTest.java
new file mode 100644
index 0000000000..ed1e06687c
--- /dev/null
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskRegionCompactorClearsObjectThatAreNoLongerNeededIntegrationTest.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+
+/**
+ * Verifies that the unnecessary memory is cleared when operational log is 
compacted.
+ */
+public class 
DiskRegionCompactorClearsObjectThatAreNoLongerNeededIntegrationTest {
+
+  private final Properties config = new Properties();
+  private Cache cache;
+
+  private File[] diskDirs;
+  private int[] diskDirSizes;
+
+  private String regionName;
+  private String diskStoreName;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private static final int ENTRY_RANGE_0_299 = 300;
+  private static final int ENTRY_RANGE_300_599 = 600;
+
+  @Before
+  public void setUp() throws Exception {
+    String uniqueName = getClass().getSimpleName() + "_" + 
testName.getMethodName();
+    regionName = uniqueName + "_region";
+    diskStoreName = uniqueName + "_diskStore";
+
+    cache = new CacheFactory(config).create();
+
+    diskDirs = new File[1];
+    diskDirs[0] = createDirectory(temporaryFolder.getRoot(), 
testName.getMethodName());
+    diskDirSizes = new int[1];
+    Arrays.fill(diskDirSizes, Integer.MAX_VALUE);
+
+    DiskStoreImpl.SET_IGNORE_PREALLOCATE = true;
+    TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 1;
+    TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 1;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      cache.close();
+    } finally {
+      DiskStoreImpl.SET_IGNORE_PREALLOCATE = false;
+    }
+  }
+
+  /**
+   * Verifies that the unnecessary memory is cleared when operational log 
(.crf and .drf) is
+   * compacted.
+   * This test case covers the following scenario:
+   *
+   * 1. Create several Oplog files (.crf, .drf and .krf) by executing put 
operations
+   * 2. Execute destroy operation for every fifth entry, and each time add new 
entry. This will
+   * result with few additional Oplog files. Compaction threshold will not be 
reached.
+   * 3. Destroy all operations created in step 2. This will trigger compaction 
of files that
+   * were created in step 2. Compaction will delete only .crf and .krf files, 
but will not
+   * delete .drf files because they contain destroy operations for events 
located in
+   * .crf files crated in step 1.
+   * 4. Check that unnecessary objects are cleared for theOplog that 
represents orphaned .drf
+   * file (no accompanying .crf and .krf file)
+   **/
+  @Test
+  public void 
testCompactorRegionMapDeletedForOnlyDrfOplogAfterCompactionIsDone()
+      throws InterruptedException {
+
+    createDiskStore(30, 10000);
+    Region<Object, Object> region = createRegion();
+    DiskStoreImpl diskStore = ((InternalRegion) region).getDiskStore();
+
+    // Create several oplog files (.crf and .drf) by executing put operations 
in defined range
+    executePutsInRange0_299(region);
+    await().untilAsserted(() -> 
assertThat(getCurrentNumberOfOplogs(diskStore)).isEqualTo(5));
+
+    // Destroy every fifth entry from previous range and each time put new 
entry in new range.
+    // This will create additional oplog files (.crf and .drf), but compaction 
will not be triggered
+    // as threshold will not be reached. Oplog files (.drf) created in this 
step will contain
+    // destroys for events that are located in .crf files from previous range.
+    destroyEveryFifthElementInRange0_299AndEachTimePutInRange300_599(region);
+    await().untilAsserted(() -> 
assertThat(getCurrentNumberOfOplogs(diskStore)).isEqualTo(7));
+
+    // Destroy all events created in previous step in order to trigger 
automatic compaction.
+    // This will trigger compaction for the files that were created in 
previous step.
+    // Compaction will delete .crf and .krf file, but will leave .drf file 
because it contains
+    // destroy operation for the events that are located in some older .crf 
files.
+    destroyEveryFifthElementInRange300_599(region);
+
+    // wait for all Oplog's to be compacted
+    await().untilAsserted(() -> 
assertThat(isOplogToBeCompactedAvailable(diskStore)).isFalse());
+
+    await().untilAsserted(
+        () -> 
assertThat(areAllUnnecessaryObjectClearedForOnlyDrfOplog(diskStore)).isTrue());
+  }
+
+  /**
+   * Verifies that the unnecessary memory is cleared when operational log 
(.crf and .drf) is
+   * compacted. This test case covers the following scenario:
+   *
+   * 1. Create several Oplog files (.crf, .drf and .krf) by executing put 
operations
+   * 2. Execute destroy operation for every fifth entry from previous step, 
and each time add and
+   * destroy new entry. This will result with Oplogs that have 0 live entries 
at the moment when
+   * they are rolled out. These Oplogs will be closed early since compaction 
doesn't have to be
+   * done for them. When Oplog is closed early the .crf files will be deleted, 
but .drf files will
+   * not because they contain deletes.
+   * 3. Check that unnecessary objects are cleared for theOplog that 
represents orphaned .drf
+   * file (no accompanying .crf and .krf file)
+   **/
+  @Test
+  public void 
testCompactorRegionMapDeletedForOnlyDrfOplogAfterCompactionNotDoneDueToLiveCountZero()
+      throws InterruptedException {
+
+    createDiskStore(5, 10000);
+    Region<Object, Object> region = createRegion();
+    DiskStoreImpl diskStore = ((InternalRegion) region).getDiskStore();
+
+    // Create several oplog files (.crf and .drf) by executing put operations 
in defined range
+    executePutsInRange0_299(region);
+    await().untilAsserted(() -> 
assertThat(getCurrentNumberOfOplogs(diskStore)).isEqualTo(5));
+
+    // Do multiple put and destroy operations to hit the scenario in which 
there are 0
+    // live entries are in Oplog just before compaction is triggered.
+    destroyEveryFifthElementInRange0_299PutAndDestroyInRange300_599(region);
+
+    await().untilAsserted(() -> 
assertThat(isOplogToBeCompactedAvailable(diskStore)).isFalse());
+
+    await().untilAsserted(
+        () -> 
assertThat(areAllUnnecessaryObjectClearedForOnlyDrfOplog(diskStore)).isTrue());
+  }
+
+  /**
+   * Verifies that the unnecessary memory is cleared when operational log 
(.crf and .drf) is
+   * compacted.This is special scenario were creation of .krf file is 
cancelled by ongoing
+   * compaction. This usually happens when new oplog is rolled out and 
previous oplog is
+   * immediately marked as eligible for compaction. Compaction and .krf 
creation start at the
+   * similar time and compactor cancels creation of .krf if it is executed 
first.
+   *
+   * This test case covers the following scenario:
+   *
+   * 1. Create several Oplog files (.crf, .drf and .krf) by executing put 
operations.
+   * 2. Execute destroy operation for every fifth entry, and each time add new 
entry. When
+   * it is time for oplog to roll out, the previous oplog will be immediately 
marked as ready
+   * for compaction because compaction threshold is set to high value in this 
case. This way
+   * we force that compaction cancel creation of .krf file. Compaction will
+   * delete only .crf file (.krf was not created at all), but will not delete 
.drf files because
+   * they contain destroy operations for events located in .crf files created 
in step 1.
+   * 3. Check that unnecessary objects are cleared for theOplog that 
represents orphaned .drf
+   * file (no accompanying .crf and .krf file)
+   **/
+  @Test
+  public void 
testCompactorRegionMapDeletedAfterCompactionForOnlyDrfOplogAndKrfCreationCanceledByCompactionIsDone()
 {
+
+    createDiskStore(70, 10000);
+    Region<Object, Object> region = createRegion();
+    DiskStoreImpl diskStore = ((InternalRegion) region).getDiskStore();
+
+    // Create several oplog files (.crf and .drf) by executing put operations 
in defined range
+    executePutsInRange0_299(region);
+    await().untilAsserted(() -> 
assertThat(getCurrentNumberOfOplogs(diskStore)).isEqualTo(5));
+
+    destroyEveryFifthElementInRange0_299AndEachTimePutInRange300_599(region);
+    await().untilAsserted(() -> 
assertThat(isOplogToBeCompactedAvailable(diskStore)).isFalse());
+
+    await().untilAsserted(
+        () -> 
assertThat(areAllUnnecessaryObjectClearedForOnlyDrfOplog(diskStore)).isTrue());
+  }
+
+  /**
+   * Verifies that the region is recovered from Oplog's (including .drf only 
oplog's) when region is
+   * closed and then recreated again in order to trigger recovery.
+   * This test case covers the following scenario:
+   *
+   * 1. Create several Oplog files (.crf, .drf and .krf) by executing put 
operations
+   * 2. Execute destroy operation for every fifth entry, and each time add new 
entry. This will
+   * result with few additional Oplog files. Compaction threshold will not be 
reached.
+   * 3. Destroy all operations created in step 2. This will trigger compaction 
of files that
+   * were created in step 2. Compaction will delete only .crf and .krf files, 
but will not
+   * delete .drf files because they contain destroy operations for events 
located in
+   * .crf files created in step 1. Check that unnecessary objects are cleared 
for the
+   * Oplog that represents orphaned .drf file (no accompanying .crf and .krf 
file)
+   * 4. At this step there will be Oplog objects from step 1. that will 
contain all files (.crf,
+   * .drf and .krf), and Oplog objects from step 2. that will contain only 
.drf files.
+   * In order to test recovery, close the region and then recreate it again.
+   * 5. Check that region is recovered correctly. Check that only events that 
have never been
+   * destroyed are recovered from Oplog files.
+   * 6. Check that unnecessary objects are cleared for theOplog that 
represents orphaned .drf
+   * file (no accompanying .crf and .krf file)
+   **/
+  @Test
+  public void 
testCompactorRegionMapDeletedForOnlyDrfOplogAfterCompactionAndRecoveryAfterRegionClose()
+      throws InterruptedException {
+
+    createDiskStore(30, 10000);
+    Region<Object, Object> region = createRegion();
+    DiskStoreImpl diskStore = ((InternalRegion) region).getDiskStore();
+
+    // Create several oplog files (.crf and .drf) by executing put operations 
in defined range
+    executePutsInRange0_299(region);
+    await().untilAsserted(() -> 
assertThat(getCurrentNumberOfOplogs(diskStore)).isEqualTo(5));
+
+    // Destroy every fifth entry from previous range and each time put new 
entry in new range.
+    // This will create additional oplog files (.crf and .drf), but compaction 
will not be triggered
+    // as threshold will not be reached. Oplog files (.drf) created in this 
step will contain
+    // destroys for events that are located in .crf files from previous range.
+    destroyEveryFifthElementInRange0_299AndEachTimePutInRange300_599(region);
+    await().untilAsserted(() -> 
assertThat(getCurrentNumberOfOplogs(diskStore)).isEqualTo(7));
+
+    // Destroy all events created in previous step in order to trigger 
automatic compaction.
+    // This will trigger compaction for the files that were created in 
previous step.
+    // Compaction will delete .crf and .krf file, but will leave .drf file 
because it contains
+    // destroy operation for the events that are located in some older .crf 
files.
+    destroyEveryFifthElementInRange300_599(region);
+
+    // wait for all Oplog's to be compacted
+    await().untilAsserted(() -> 
assertThat(isOplogToBeCompactedAvailable(diskStore)).isFalse());
+
+    // close the region and then recreate it to trigger recovery from oplog 
files
+    region.close();
+    region = createRegion();
+
+    // every fifth element is destroyed from range ENTRY_RANGE_1, so it is 
expected to have
+    // ENTRY_RANGE_1 - (ENTRY_RANGE_1/5) of elements, also just in case check 
that every
+    // fifth get operation return null
+    checkThatAllRegionDataIsRecoveredFromOplogFiles(region);
+
+    // check that unnecessary data is cleared from Oplog's
+    await().untilAsserted(
+        () -> 
assertThat(areAllUnnecessaryObjectClearedForOnlyDrfOplog(diskStore)).isTrue());
+  }
+
+  /**
+   * Verifies that the region is recovered from Oplog's (including .drf only 
oplog's) when cache is
+   * closed and then recreated again in order to trigger recovery.
+   * This test case covers the following scenario:
+   *
+   * 1. Create several Oplog files (.crf, .drf and .krf) by executing put 
operations
+   * 2. Execute destroy operation for every fifth entry, and each time add new 
entry. This will
+   * result with few additional Oplog files. Compaction threshold will not be 
reached.
+   * 3. Destroy all operations created in step 2. This will trigger compaction 
of files that
+   * were created in step 2. Compaction will delete only .crf and .krf files, 
but will not
+   * delete .drf files because they contain destroy operations for events 
located in
+   * .crf files created in step 1.
+   * 4. At this step there will be Oplog objects from step 1. that will 
contain all files (.crf,
+   * .drf and .krf), and Oplog objects from step 2. that will contain only 
.drf files.
+   * In order to test recovery, close the cache and then recreate it again.
+   * 5. Check that region is recovered correctly. Check that only events that 
have never been
+   * destroyed are recovered from Oplog files.
+   * 6. Check that unnecessary objects are cleared for theOplog that 
represents orphaned .drf
+   * file (no accompanying .crf and .krf file)
+   **/
+  @Test
+  public void 
testCompactorRegionMapDeletedForOnlyDrfOplogAfterCompactionAndRecoveryAfterCacheClosed()
+      throws InterruptedException {
+
+    createDiskStore(30, 10000);
+    Region<Object, Object> region = createRegion();
+    DiskStoreImpl diskStore = ((InternalRegion) region).getDiskStore();
+
+    // Create several oplog files (.crf and .drf) by executing put operations 
in defined range
+    executePutsInRange0_299(region);
+    await().untilAsserted(() -> 
assertThat(getCurrentNumberOfOplogs(diskStore)).isEqualTo(5));
+
+    // Destroy every fifth entry from previous range and each time put new 
entry in new range.
+    // This will create additional oplog files (.crf and .drf), but compaction 
will not be triggered
+    // as threshold will not be reached. Oplog files (.drf) created in this 
step will contain
+    // destroys for events that are located in .crf files from previous range.
+    destroyEveryFifthElementInRange0_299AndEachTimePutInRange300_599(region);
+    await().untilAsserted(() -> 
assertThat(getCurrentNumberOfOplogs(diskStore)).isEqualTo(7));
+
+    // Destroy all events created in previous step in order to trigger 
automatic compaction.
+    // This will trigger compaction for the files that were created in 
previous step.
+    // Compaction will delete .crf and .krf file, but will leave .drf file 
because it contains
+    // destroy operation for the events that are located in some older .crf 
files.
+    destroyEveryFifthElementInRange300_599(region);
+
+    // wait for all Oplog's to be compacted
+    await().untilAsserted(() -> 
assertThat(isOplogToBeCompactedAvailable(diskStore)).isFalse());
+
+    // close the cache and then recreate it again to trigger recovery from 
oplog files
+    cache.close();
+    cache = new CacheFactory(config).create();
+    createDiskStore(30, 10000);
+    region = createRegion();
+
+    // every fifth element is destroyed from range ENTRY_RANGE_1, so it is 
expected to have
+    // ENTRY_RANGE_1 - (ENTRY_RANGE_1/5) of elements, also just in case check 
that every
+    // fifth get operation return null
+    checkThatAllRegionDataIsRecoveredFromOplogFiles(region);
+
+    await().untilAsserted(
+        () -> 
assertThat(areAllUnnecessaryObjectClearedForOnlyDrfOplog(diskStore)).isTrue());
+  }
+
+  void checkThatAllRegionDataIsRecoveredFromOplogFiles(Region<Object, Object> 
region) {
+    int objectCount = 0;
+    for (int i = 0; i < ENTRY_RANGE_0_299; i++) {
+      Object object = region.get(i);
+      if (i % 5 == 0) {
+        assertThat(object).isNull();
+      }
+      if (object != null) {
+        objectCount++;
+      }
+    }
+    assertThat(objectCount).isEqualTo(ENTRY_RANGE_0_299 - (ENTRY_RANGE_0_299 / 
5));
+  }
+
+  void executePutsInRange0_299(Region<Object, Object> region) {
+    for (int i = 0; i < ENTRY_RANGE_0_299; i++) {
+      region.put(i, new byte[100]);
+    }
+  }
+
+  void destroyEveryFifthElementInRange0_299PutAndDestroyInRange300_599(
+      Region<Object, Object> region)
+      throws InterruptedException {
+    TombstoneService tombstoneService = ((InternalCache) 
cache).getTombstoneService();
+    for (int i = ENTRY_RANGE_0_299; i < ENTRY_RANGE_300_599; i++) {
+      region.put(i, new byte[300]);
+      region.destroy(i);
+      assertThat(tombstoneService.forceBatchExpirationForTests(1)).isTrue();
+      if (i % 5 == 0) {
+        region.destroy(i - ENTRY_RANGE_0_299);
+        assertThat(tombstoneService.forceBatchExpirationForTests(1)).isTrue();
+      }
+    }
+  }
+
+  void destroyEveryFifthElementInRange300_599(Region<Object, Object> region)
+      throws InterruptedException {
+    TombstoneService tombstoneService = ((InternalCache) 
cache).getTombstoneService();
+    int key = ENTRY_RANGE_0_299;
+    while (key < ENTRY_RANGE_300_599) {
+      region.destroy(key);
+      assertThat(tombstoneService.forceBatchExpirationForTests(1)).isTrue();
+      key = key + 5;
+    }
+  }
+
+  void destroyEveryFifthElementInRange0_299AndEachTimePutInRange300_599(
+      Region<Object, Object> region) {
+    TombstoneService tombstoneService = ((InternalCache) 
cache).getTombstoneService();
+    int key = 0;
+    while (key < ENTRY_RANGE_0_299) {
+      region.destroy(key);
+      // It is necessary to force tombstone expiration, otherwise event won't 
be stored in .drf file
+      // and total live count won't be decreased
+      await().untilAsserted(
+          () -> 
assertThat(tombstoneService.forceBatchExpirationForTests(1)).isTrue());
+      region.put(key + ENTRY_RANGE_0_299, new byte[300]);
+      key = key + 5;
+    }
+  }
+
+  boolean areAllUnnecessaryObjectClearedForOnlyDrfOplog(DiskStoreImpl 
diskStore) {
+    boolean isClear = true;
+    for (Oplog oplog : diskStore.getAllOplogsForBackup()) {
+      if (oplog.getHasDeletes() && oplog.isDeleted() && 
oplog.hasNoLiveValues()) {
+        if (oplog.getRegionMapSize() != 0)
+          isClear = false;
+      }
+    }
+    return isClear;
+  }
+
+  void createDiskStore(int compactionThreshold, int maxOplogSizeInBytes) {
+    DiskStoreFactoryImpl diskStoreFactory = (DiskStoreFactoryImpl) 
cache.createDiskStoreFactory();
+    diskStoreFactory.setAutoCompact(true);
+    diskStoreFactory.setCompactionThreshold(compactionThreshold);
+    diskStoreFactory.setDiskDirsAndSizes(diskDirs, diskDirSizes);
+
+    createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 
maxOplogSizeInBytes);
+  }
+
+  Region<Object, Object> createRegion() {
+    RegionFactory<Object, Object> regionFactory =
+        cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT);
+    regionFactory.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+    regionFactory.setDiskStoreName(diskStoreName);
+    regionFactory.setDiskSynchronous(true);
+    return regionFactory.create(regionName);
+  }
+
+  boolean isOplogToBeCompactedAvailable(DiskStoreImpl ds) {
+    if (ds.getOplogToBeCompacted() == null) {
+      return false;
+    }
+    return ds.getOplogToBeCompacted().length > 0;
+  }
+
+  int getCurrentNumberOfOplogs(DiskStoreImpl ds) {
+    return ds.getAllOplogsForBackup().length;
+  }
+
+  private File createDirectory(File parentDirectory, String name) {
+    File file = new File(parentDirectory, name);
+    assertThat(file.mkdir()).isTrue();
+    return file;
+  }
+
+  private void createDiskStoreWithSizeInBytes(String diskStoreName,
+      DiskStoreFactoryImpl diskStoreFactory,
+      long maxOplogSizeInBytes) {
+    diskStoreFactory.setMaxOplogSizeInBytes(maxOplogSizeInBytes);
+    diskStoreFactory.setDiskDirSizesUnit(DiskDirSizesUnit.BYTES);
+    diskStoreFactory.create(diskStoreName);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index 554b43c168..7d8737040b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -191,9 +192,6 @@ public class Oplog implements CompactableOplog, Flushable {
    */
   private final AtomicLong totalLiveCount = new AtomicLong(0);
 
-  private final ConcurrentMap<Long, DiskRegionInfo> regionMap =
-      new ConcurrentHashMap<>();
-
   /**
    * Set to true once compact is called on this oplog.
    *
@@ -529,6 +527,11 @@ public class Oplog implements CompactableOplog, Flushable {
    */
   private boolean doneAppending = false;
 
+  /**
+   * Used to track all information's about live entries that region has in 
this oplog.
+   */
+  private final RegionMap regionMap = new RegionMap();
+
   /**
    * Creates new {@code Oplog} for the given region.
    *
@@ -811,7 +814,7 @@ public class Oplog implements CompactableOplog, Flushable {
    * Return true if this oplog has a drf but does not have a crf
    */
   boolean isDrfOnly() {
-    return drf.f != null && crf.f == null;
+    return drf.f != null && (crf.f == null || !crf.f.exists());
   }
 
   /**
@@ -1302,17 +1305,19 @@ public class Oplog implements CompactableOplog, 
Flushable {
     // while a krf is being created can not close a region
     lockCompactor();
     try {
-      DiskRegionInfo dri = getDRI(dr);
-      if (dri != null) {
-        long clearCount = dri.clear(null);
-        if (clearCount != 0) {
-          totalLiveCount.addAndGet(-clearCount);
-          // no need to call handleNoLiveValues because we now have an
-          // unrecovered region.
+      if (!isDrfOnly()) {
+        DiskRegionInfo dri = getDRI(dr);
+        if (dri != null) {
+          long clearCount = dri.clear(null);
+          if (clearCount != 0) {
+            totalLiveCount.addAndGet(-clearCount);
+            // no need to call handleNoLiveValues because we now have an
+            // unrecovered region.
+          }
+          regionMap.get().remove(dr.getId(), dri);
         }
-        regionMap.remove(dr.getId(), dri);
+        addUnrecoveredRegion(dr.getId());
       }
-      addUnrecoveredRegion(dr.getId());
     } finally {
       unlockCompactor();
     }
@@ -1341,7 +1346,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
           handleNoLiveValues();
         }
       }
-      regionMap.remove(dr.getId(), dri);
+      regionMap.get().remove(dr.getId(), dri);
     }
   }
 
@@ -4010,7 +4015,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
             return;
           }
 
-          Collection<DiskRegionInfo> regions = regionMap.values();
+          Collection<DiskRegionInfo> regions = regionMap.get().values();
           List<KRFEntry> sortedLiveEntries = getSortedLiveEntries(regions);
           if (sortedLiveEntries == null) {
             // no need to create a krf if there are no live entries.
@@ -4670,7 +4675,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
   }
 
   private DiskRegionInfo getDRI(long drId) {
-    return regionMap.get(drId);
+    return regionMap.get().get(drId);
   }
 
   private DiskRegionInfo getDRI(DiskRegionView dr) {
@@ -4683,7 +4688,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
       dri = (isCompactionPossible() || couldHaveKrf())
           ? new DiskRegionInfoWithList(dr, couldHaveKrf(), krfCreated.get())
           : new DiskRegionInfoNoList(dr);
-      DiskRegionInfo oldDri = regionMap.putIfAbsent(dr.getId(), dri);
+      DiskRegionInfo oldDri = regionMap.get().putIfAbsent(dr.getId(), dri);
       if (oldDri != null) {
         dri = oldDri;
       }
@@ -4708,7 +4713,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
       dri = (isCompactionPossible() || couldHaveKrf())
           ? new DiskRegionInfoWithList(null, couldHaveKrf(), krfCreated.get())
           : new DiskRegionInfoNoList(null);
-      DiskRegionInfo oldDri = regionMap.putIfAbsent(drId, dri);
+      DiskRegionInfo oldDri = regionMap.get().putIfAbsent(drId, dri);
       if (oldDri != null) {
         dri = oldDri;
       }
@@ -5714,6 +5719,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
       cancelKrf();
       close();
       deleteFiles(getHasDeletes());
+      regionMap.close();
     } finally {
       unlockCompactor();
     }
@@ -5861,7 +5867,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
                                  * getParent().getOwner().isDestroyed ||
                                  */!compactor.keepCompactorRunning();
         int totalCount = 0;
-        for (DiskRegionInfo dri : regionMap.values()) {
+        for (DiskRegionInfo dri : regionMap.get().values()) {
           final DiskRegionView dr = dri.getDiskRegion();
           if (dr == null) {
             continue;
@@ -5976,7 +5982,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
 
     List<KRFEntry> sortedLiveEntries;
 
-    HashMap<Long, DiskRegionInfo> targetRegions = new HashMap<>(regionMap);
+    HashMap<Long, DiskRegionInfo> targetRegions = new 
HashMap<>(regionMap.get());
     synchronized (diskRecoveryStores) {
       diskRecoveryStores.values()
           .removeIf(store -> isLruValueRecoveryDisabled(store) || 
store.lruLimitExceeded());
@@ -6271,6 +6277,10 @@ public class Oplog implements CompactableOplog, 
Flushable {
     return version;
   }
 
+  public int getRegionMapSize() {
+    return regionMap.get().size();
+  }
+
   public enum OPLOG_TYPE {
     CRF(new byte[] {0x47, 0x46, 0x43, 0x52, 0x46, 0x31}), // GFCRF1
     DRF(new byte[] {0x47, 0x46, 0x44, 0x52, 0x46, 0x31}), // GFDRF1
@@ -7642,4 +7652,23 @@ public class Oplog implements CompactableOplog, 
Flushable {
 
   }
 
+  /**
+   * Used to track all information's about live entries that region has in 
this oplog.
+   * That information is only needed until oplog is compacted. This is because 
compaction will
+   * clear all live entries from this oplog.
+   */
+  private static class RegionMap {
+
+    final AtomicReference<ConcurrentMap<Long, DiskRegionInfo>> regionMap =
+        new AtomicReference<>(new ConcurrentHashMap<>());
+
+    public void close() {
+      regionMap.set(null);
+    }
+
+    public ConcurrentMap<Long, DiskRegionInfo> get() {
+      ConcurrentMap<Long, DiskRegionInfo> regionConcurrentMap = 
regionMap.get();
+      return regionConcurrentMap != null ? regionConcurrentMap : new 
ConcurrentHashMap<>();
+    }
+  }
 }

Reply via email to