This is an automated email from the ASF dual-hosted git repository.

mkataria pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c5e75c4  OAK-9610: Have a jmx to explicitly expire indexing lease in 
mongo mk (#406)
c5e75c4 is described below

commit c5e75c470c858f526f0da765e4bd184ed68a5a78
Author: Mohit Kataria <mkata...@apache.org>
AuthorDate: Tue Nov 9 17:03:22 2021 +0530

    OAK-9610: Have a jmx to explicitly expire indexing lease in mongo mk (#406)
    
    * OAK-9610: Have a jmx to explicitly expire indexing lease in mongo mk
---
 .../jackrabbit/oak/api/jmx/IndexStatsMBean.java    |   3 +
 .../jackrabbit/oak/api/jmx/package-info.java       |   2 +-
 .../oak/plugins/index/AsyncIndexUpdate.java        |  36 +++++++
 .../index/AsyncIndexUpdateLeaseRemoveTest.java     | 118 +++++++++++++++++++++
 4 files changed, 158 insertions(+), 1 deletion(-)

diff --git 
a/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java 
b/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
index 415b346..6ef4d8e 100644
--- 
a/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
+++ 
b/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
@@ -84,6 +84,9 @@ public interface IndexStatsMBean {
             "resume indexing again")
     String abortAndPause();
 
+    @Description("Release lease for a paused lane")
+    String releaseLeaseForPausedLane();
+
     /**
      * Resumes the indexing process. All changes from the previous indexed 
state
      * will be indexed.
diff --git 
a/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java 
b/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java
index f6b4424..870e73d 100644
--- a/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java
+++ b/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-@Version("4.9.0")
+@Version("4.10.0")
 package org.apache.jackrabbit.oak.api.jmx;
 
 import org.osgi.annotation.versioning.Version;
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
index fc0fac0..91279d7 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
@@ -488,6 +488,14 @@ public class AsyncIndexUpdate implements Runnable, 
Closeable {
 
     private void runWhenPermitted() {
         if (indexStats.isPaused()) {
+            if (indexStats.forcedLeaseRelease){
+                try {
+                    clearLease();
+                } catch (CommitFailedException e) {
+                    log.warn("Unable to release lease, please try again", e);
+                }
+                indexStats.forcedLeaseRelease = false;
+            }
             log.debug("[{}] Ignoring the run as indexing is paused", name);
             return;
         }
@@ -626,6 +634,23 @@ public class AsyncIndexUpdate implements Runnable, 
Closeable {
         }
     }
 
+    private void clearLease() throws CommitFailedException {
+        NodeState root = store.getRoot();
+        NodeState async = root.getChildNode(ASYNC);
+        String beforeCheckpoint = async.getString(name);
+        String leaseName= leasify(name);
+        if (async.hasProperty(leaseName)) {
+            NodeBuilder builder = root.builder();
+            builder.child(ASYNC).removeProperty(leaseName);
+            mergeWithConcurrencyCheck(store, validatorProviders,
+                    builder, beforeCheckpoint, null, name);
+            log.info("Lease property removed for lane: {}", name);
+        } else {
+            log.info("No Lease property present for lane: {}", name);
+        }
+
+    }
+
     private boolean shouldProceed() {
         NodeState asyncNode = store.getRoot().getChildNode(":async");
         /*
@@ -1011,6 +1036,7 @@ public class AsyncIndexUpdate implements Runnable, 
Closeable {
         private Set<String> tempCps = new HashSet<String>();
 
         private volatile boolean isPaused;
+        private volatile boolean forcedLeaseRelease;
         private volatile long updates;
         private volatile long nodesRead;
         private final Stopwatch watch = Stopwatch.createUnstarted();
@@ -1161,12 +1187,22 @@ public class AsyncIndexUpdate implements Runnable, 
Closeable {
         }
 
         @Override
+        public String releaseLeaseForPausedLane() {
+            if (this.isPaused()){
+                this.forcedLeaseRelease = true;
+                return "LeaseRelease flag set";
+            }
+            return "Please pause the lane to release lease";
+        }
+
+        @Override
         public void resume() {
             log.debug("[{}] Resuming the async indexer", name);
             this.isPaused = false;
 
             //Clear the forcedStop flag as fail safe
             forcedStopFlag.set(false);
+            this.forcedLeaseRelease = false;
         }
 
         @Override
diff --git 
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseRemoveTest.java
 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseRemoveTest.java
new file mode 100644
index 0000000..6ad55cb
--- /dev/null
+++ 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseRemoveTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import 
org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Test;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.ASYNC;
+import static org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.leasify;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AsyncIndexUpdateLeaseRemoveTest {
+    private ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+    private MetricStatisticsProvider statsProvider =
+            new 
MetricStatisticsProvider(ManagementFactory.getPlatformMBeanServer(), executor);
+
+    @After
+    public void shutDown() {
+        statsProvider.close();
+        new ExecutorCloser(executor).close();
+    }
+
+    @Test
+    public void releaseLeaseOnlyAfterIndexingLanePause() throws Exception {
+        NodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, 
provider) {
+            @Override
+            protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore 
store, String name, long leaseTimeOut,
+                                                                 String 
beforeCheckpoint,
+                                                                 
AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
+                return new AsyncUpdateCallback(store, name, leaseTimeOut, 
beforeCheckpoint,
+                        indexStats, stopFlag) {
+                    @Override
+                    void close() throws CommitFailedException {
+                        // overridden close method so that lease is not 
deleted on async cycle completion.
+                        //super.close();
+                    }
+                };
+            }
+        };
+        async.setLeaseTimeOut(250);
+        async.run();
+        Long leaseValue = getLeaseValue(store);
+        assertNotNull(leaseValue);
+        async.getIndexStats().abortAndPause();
+        async.getIndexStats().releaseLeaseForPausedLane();
+        async.run();
+        leaseValue = getLeaseValue(store);
+        assertEquals(IndexStatsMBean.STATUS_DONE, 
async.getIndexStats().getStatus());
+        assertNotNull(leaseValue);
+
+        async.getIndexStats().resume();
+        async.run();
+        leaseValue = getLeaseValue(store);
+        assertNotNull(leaseValue);
+        // Not pausing indexing lane now.
+        // async.getIndexStats().abortAndPause();
+
+        // now as lane is not paused lease will not be released.
+        async.getIndexStats().releaseLeaseForPausedLane();
+        async.run();
+        leaseValue = getLeaseValue(store);
+        System.out.println("new lease value:" + leaseValue);
+        assertEquals(IndexStatsMBean.STATUS_DONE, 
async.getIndexStats().getStatus());
+        assertTrue(leaseValue != null);
+    }
+
+    private Long getLeaseValue(NodeStore store) {
+        NodeBuilder builder1 = store.getRoot().builder();
+        NodeBuilder async1 = builder1.getChildNode(ASYNC);
+        return async1.getProperty(leasify("async")) == null ? null : 
async1.getProperty(leasify("async")).getValue(Type.LONG);
+    }
+}

Reply via email to