Author: amitj
Date: Thu Dec 13 06:34:07 2018
New Revision: 1848822

URL: http://svn.apache.org/viewvc?rev=1848822&view=rev
Log:
OAK-7951: Datastore GC stats not updated with failure when "Not all 
repositories have marked references available"

Based on patch from Wim Symons
- Bubble up exception in case of not able to run sweep because of not all 
repositories not having references available
- Mark failure in stats which can then be queried

Added:
    
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java

Modified: 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1848822&r1=1848821&r2=1848822&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
 Thu Dec 13 06:34:07 2018
@@ -85,10 +85,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Mark and sweep garbage collector.
- * 
+ *
  * Uses the file system to store internal state while in process to account 
for huge data.
  * This class is not thread safe.
- * 
+ *
  */
 public class MarkSweepGarbageCollector implements BlobGarbageCollector {
 
@@ -97,7 +97,7 @@ public class MarkSweepGarbageCollector i
     public static final String TEMP_DIR = 
StandardSystemProperty.JAVA_IO_TMPDIR.value();
 
     public static final int DEFAULT_BATCH_COUNT = 1024;
-    
+
     public static final String DELIM = ",";
 
     private static final Function<String, String> transformer = new 
Function<String, String>() {
@@ -234,7 +234,7 @@ public class MarkSweepGarbageCollector i
 
     /**
      * Returns the stats related to GC for all repos
-     * 
+     *
      * @return a list of GarbageCollectionRepoStats objects
      * @throws Exception
      */
@@ -348,16 +348,18 @@ public class MarkSweepGarbageCollector i
                 long deleteCount;
                 try {
                     deleteCount = sweep(fs, markStart, forceBlobRetrieve);
+
+                    long maxTime = getMaxModifiedTime(markStart) > 0 ? 
getMaxModifiedTime(markStart) : markStart;
+                    LOG.info("Blob garbage collection completed in {} ({} ms). 
Number of blobs deleted [{}] with max modification time of [{}]",
+                        sw.toString(), sw.elapsed(TimeUnit.MILLISECONDS), 
deleteCount, timestampToString(maxTime));
+
                     threw = false;
+                } catch (NotAllRepositoryMarkedException rm) {
+                    statsCollector.finishFailure();
                 } finally {
                     sw.stop();
                     
statsCollector.updateSweepDuration(sw.elapsed(TimeUnit.MILLISECONDS) - 
markFinish, TimeUnit.MILLISECONDS);
                 }
-
-                long maxTime = getMaxModifiedTime(markStart) > 0 ? 
getMaxModifiedTime(markStart) : markStart;
-
-                LOG.info("Blob garbage collection completed in {} ({} ms). 
Number of blobs deleted [{}] with max modification time of [{}]",
-                        sw.toString(), sw.elapsed(TimeUnit.MILLISECONDS), 
deleteCount, timestampToString(maxTime));
             }
         } catch (Exception e) {
             statsCollector.finishFailure();
@@ -394,7 +396,7 @@ public class MarkSweepGarbageCollector i
 
     /**
      * Difference phase where the GC candidates are identified.
-     * 
+     *
      * @param fs the garbage collector file state
      * @throws IOException
      *             Signals that an I/O exception has occurred.
@@ -453,14 +455,10 @@ public class MarkSweepGarbageCollector i
         long earliestRefAvailTime;
         // Merge all the blob references available from all the reference 
files in the data store meta store
         // Only go ahead if merge succeeded
-        try {
-            earliestRefAvailTime =
-                    
GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
-            LOG.debug("Earliest reference available for timestamp [{}]", 
earliestRefAvailTime);
-            earliestRefAvailTime = (earliestRefAvailTime < markStart ? 
earliestRefAvailTime : markStart);
-        } catch (Exception e) {
-            return 0;
-        }
+        earliestRefAvailTime =
+          
GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
+        LOG.debug("Earliest reference available for timestamp [{}]", 
earliestRefAvailTime);
+        earliestRefAvailTime = (earliestRefAvailTime < markStart ? 
earliestRefAvailTime : markStart);
 
         // Find all blob references after iterating over the whole repository
         (new BlobIdRetriever(fs, forceBlobRetrieve)).call();
@@ -469,7 +467,7 @@ public class MarkSweepGarbageCollector i
         difference(fs);
         long count = 0;
         long deleted = 0;
-        
+
         long maxModifiedTime = getMaxModifiedTime(earliestRefAvailTime);
         LOG.debug("Starting sweep phase of the garbage collector");
         LOG.debug("Sweeping blobs with modified time > than the configured max 
deleted time ({}). ",
@@ -511,7 +509,7 @@ public class MarkSweepGarbageCollector i
         BlobCollectionType.get(blobStore).handleRemoves(blobStore, 
fs.getGarbage(), fs.getMarkedRefs());
 
         if(count != deleted) {
-            LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates 
identified. This may happen if blob " 
+            LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates 
identified. This may happen if blob "
                          + "modified time is > "
                          + "than the max deleted time ({})", deleted, count,
                         timestampToString(maxModifiedTime));
@@ -643,10 +641,10 @@ public class MarkSweepGarbageCollector i
             closeQuietly(writer);
         }
     }
-    
+
     /**
      * Checks for the DataStore consistency and reports the number of missing 
blobs still referenced.
-     * 
+     *
      * @return the missing blobs
      * @throws Exception
      */
@@ -661,12 +659,12 @@ public class MarkSweepGarbageCollector i
 
         try {
             LOG.info("Starting blob consistency check");
-    
+
             // Find all blobs available in the blob store
             ListenableFutureTask<Integer> blobIdRetriever = 
ListenableFutureTask.create(new BlobIdRetriever(fs,
                 true));
             executor.execute(blobIdRetriever);
-    
+
             // Mark all used blob references
             iterateNodeTree(fs, true);
             
consistencyStatsCollector.updateMarkDuration(sw.elapsed(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS);
@@ -678,7 +676,7 @@ public class MarkSweepGarbageCollector i
                 threw = false;
                 throw e;
             }
-            
+
             LOG.trace("Starting difference phase of the consistency check");
             FileLineDifferenceIterator iter = new FileLineDifferenceIterator(
                 fs.getAvailableRefs(),
@@ -721,7 +719,7 @@ public class MarkSweepGarbageCollector i
             this.fs = fs;
             this.forceRetrieve = forceBlobRetrieve;
         }
-    
+
         @Override
         public Integer call() throws Exception {
             if (!forceRetrieve) {
@@ -756,7 +754,7 @@ public class MarkSweepGarbageCollector i
         SHARED {
             /**
              * Remove the maked references and the marked markers from the 
blob store root. Default NOOP.
-             * 
+             *
              * @param blobStore the blobStore instance
              */
             @Override
@@ -767,7 +765,7 @@ public class MarkSweepGarbageCollector i
 
             /**
              * Merge all marked references available from all repositories and 
return the earliest time of the references.
-             * 
+             *
              * @param blobStore the blob store
              * @param fs the fs
              * @return the long the earliest time of the available references
@@ -798,7 +796,7 @@ public class MarkSweepGarbageCollector i
                     }
 
                     merge(files, fs.getMarkedRefs());
-                    
+
                     // Get the timestamp to indicate the earliest mark phase 
start
                     List<DataRecord> markerFiles =
                         ((SharedDataStore) blobStore).getAllMetadataRecords(
@@ -812,13 +810,13 @@ public class MarkSweepGarbageCollector i
                     return (earliestMarker < earliestRef ? earliestMarker : 
earliestRef);
                 } else {
                     LOG.error("Not all repositories have marked references 
available : {}", unAvailRepos);
-                    throw new IOException("Not all repositories have marked 
references available");
+                    throw new NotAllRepositoryMarkedException("Not all 
repositories have marked references available");
                 }
             }
 
             /**
              * Adds the marked references to the blob store root. Default NOOP
-             * 
+             *
              * @param blobStore the blob store
              * @param fs the fs
              * @param repoId the repo id
@@ -839,7 +837,7 @@ public class MarkSweepGarbageCollector i
                 ((SharedDataStore) 
blobStore).addMetadataRecord(fs.getMarkedRefs(), 
SharedStoreRecordType.REFERENCES
                     .getNameFromIdPrefix(repoId, uniqueSuffix));
             }
-            
+
             @Override
             public void addMarkedStartMarker(GarbageCollectableBlobStore 
blobStore, String repoId,
                 String uniqueSuffix) {
@@ -875,7 +873,7 @@ public class MarkSweepGarbageCollector i
             }
             return DEFAULT;
         }
-    
+
         public void addMarkedStartMarker(GarbageCollectableBlobStore 
blobStore, String repoId, String uniqueSuffix) {}
     }
 
@@ -1151,4 +1149,15 @@ public class MarkSweepGarbageCollector i
             return totalSizeDeletedCounter.getCount();
         }
     }
+
+
+    /**
+     * Marker IOException to identify sweep phase failure because of some
+     * repositories not having finished Mark phase.
+     */
+    static class NotAllRepositoryMarkedException extends IOException {
+        public NotAllRepositoryMarkedException(String message) {
+            super(message);
+       }
+   }
 }

Modified: 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java?rev=1848822&r1=1848821&r2=1848822&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
 Thu Dec 13 06:34:07 2018
@@ -235,7 +235,7 @@ public class BlobGCTest {
         Set<String> existingAfterGC = executeGarbageCollection(secondCluster, 
secondCluster.getCollector(0), false);
 
         assertEquals(totalAdded, existingAfterGC);
-        assertStats(secondCluster.statsProvider, 1, 0, 0, 0, NAME);
+        assertStats(secondCluster.statsProvider, 1, 1, 0, 0, NAME);
     }
 
     @Test

Added: 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java?rev=1848822&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
 Thu Dec 13 06:34:07 2018
@@ -0,0 +1,125 @@
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Tracker;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static 
org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type.SHARED;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SharedDataStoreMarkSweepGarbageCollectorTest {
+
+  @Mock
+  private MockGarbageCollectableSharedDataStore blobStore;
+
+  @Mock
+  private BlobReferenceRetriever marker;
+
+  @Mock
+  private Whiteboard whiteboard;
+
+  @Mock
+  private Tracker<CheckpointMBean> tracker;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  private MarkSweepGarbageCollector collector;
+
+  @Mock
+  private CheckpointMBean checkpointMBean;
+
+  private ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+
+  @Before
+  public void setUp() throws IOException {
+    when(whiteboard.track(CheckpointMBean.class)).thenReturn(tracker);
+    when(tracker.getServices()).thenReturn(ImmutableList.of(checkpointMBean));
+
+    when(blobStore.getType()).thenReturn(SHARED);
+
+    collector = new MarkSweepGarbageCollector(
+      marker,
+      blobStore,
+      executor,
+      MarkSweepGarbageCollector.TEMP_DIR,
+      1,
+      0L,
+      "repo",
+      whiteboard,
+      new DefaultStatisticsProvider(executor)
+    );
+  }
+
+  @After
+  public void tear() {
+    new ExecutorCloser(executor).close();
+  }
+
+  @Test
+  public void 
markAndSweepShouldFailIfNotAllRepositoriesHaveMarkedReferencesAvailable() 
throws Exception {
+    setupSharedDataRecords("REPO1", "REPO2");
+
+    collector.markAndSweep(false, true);
+
+    assertThat(collector.getOperationStats().numDeleted(), is(0L));
+    assertThat(collector.getOperationStats().getFailureCount(), is(1L));
+  }
+
+  @Test
+  public void markAndSweepShouldSucceedWhenAllRepositoriesAreAvailable() 
throws Exception {
+    setupSharedDataRecords("REPO1", "REPO1");
+    
when(blobStore.getAllChunkIds(0L)).thenReturn(ImmutableList.<String>of().iterator());
+
+    collector.markAndSweep(false, true);
+
+    assertThat(collector.getOperationStats().numDeleted(), is(0L));
+    assertThat(collector.getOperationStats().getFailureCount(), is(0L));
+  }
+
+  private void setupSharedDataRecords(final String refRepoId, final String 
repoRepoId) throws DataStoreException {
+    DataRecord refDataRecord = mock(DataRecord.class);
+    when(refDataRecord.getIdentifier()).thenReturn(new 
DataIdentifier("references-" + refRepoId));
+    when(refDataRecord.getStream()).thenReturn(new ByteArrayInputStream(new 
byte[0]));
+    when(refDataRecord.getLastModified()).thenReturn(10L);
+
+    DataRecord repoDataRecord = mock(DataRecord.class);
+    when(repoDataRecord.getIdentifier()).thenReturn(new 
DataIdentifier("repository-" + repoRepoId));
+
+    List<DataRecord> refs = ImmutableList.of(refDataRecord);
+    List<DataRecord> repos = ImmutableList.of(repoDataRecord);
+
+    
when(blobStore.getAllMetadataRecords(SharedDataStoreUtils.SharedStoreRecordType.REFERENCES.getType())).thenReturn(refs);
+    
when(blobStore.getAllMetadataRecords(SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY.getType())).thenReturn(repos);
+    
when(blobStore.getAllMetadataRecords(SharedDataStoreUtils.SharedStoreRecordType.MARKED_START_MARKER.getType())).thenReturn(refs);
+  }
+
+  private interface MockGarbageCollectableSharedDataStore extends 
GarbageCollectableBlobStore, SharedDataStore {
+  }
+}

Propchange: 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to