This is an automated email from the ASF dual-hosted git repository. mkataria pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push: new c5e75c4 OAK-9610: Have a jmx to explicitly expire indexing lease in mongo mk (#406) c5e75c4 is described below commit c5e75c470c858f526f0da765e4bd184ed68a5a78 Author: Mohit Kataria <mkata...@apache.org> AuthorDate: Tue Nov 9 17:03:22 2021 +0530 OAK-9610: Have a jmx to explicitly expire indexing lease in mongo mk (#406) * OAK-9610: Have a jmx to explicitly expire indexing lease in mongo mk --- .../jackrabbit/oak/api/jmx/IndexStatsMBean.java | 3 + .../jackrabbit/oak/api/jmx/package-info.java | 2 +- .../oak/plugins/index/AsyncIndexUpdate.java | 36 +++++++ .../index/AsyncIndexUpdateLeaseRemoveTest.java | 118 +++++++++++++++++++++ 4 files changed, 158 insertions(+), 1 deletion(-) diff --git a/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java b/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java index 415b346..6ef4d8e 100644 --- a/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java +++ b/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java @@ -84,6 +84,9 @@ public interface IndexStatsMBean { "resume indexing again") String abortAndPause(); + @Description("Release lease for a paused lane") + String releaseLeaseForPausedLane(); + /** * Resumes the indexing process. All changes from the previous indexed state * will be indexed. diff --git a/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java b/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java index f6b4424..870e73d 100644 --- a/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java +++ b/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java @@ -15,7 +15,7 @@ * limitations under the License. */ -@Version("4.9.0") +@Version("4.10.0") package org.apache.jackrabbit.oak.api.jmx; import org.osgi.annotation.versioning.Version; diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java index fc0fac0..91279d7 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java @@ -488,6 +488,14 @@ public class AsyncIndexUpdate implements Runnable, Closeable { private void runWhenPermitted() { if (indexStats.isPaused()) { + if (indexStats.forcedLeaseRelease){ + try { + clearLease(); + } catch (CommitFailedException e) { + log.warn("Unable to release lease, please try again", e); + } + indexStats.forcedLeaseRelease = false; + } log.debug("[{}] Ignoring the run as indexing is paused", name); return; } @@ -626,6 +634,23 @@ public class AsyncIndexUpdate implements Runnable, Closeable { } } + private void clearLease() throws CommitFailedException { + NodeState root = store.getRoot(); + NodeState async = root.getChildNode(ASYNC); + String beforeCheckpoint = async.getString(name); + String leaseName= leasify(name); + if (async.hasProperty(leaseName)) { + NodeBuilder builder = root.builder(); + builder.child(ASYNC).removeProperty(leaseName); + mergeWithConcurrencyCheck(store, validatorProviders, + builder, beforeCheckpoint, null, name); + log.info("Lease property removed for lane: {}", name); + } else { + log.info("No Lease property present for lane: {}", name); + } + + } + private boolean shouldProceed() { NodeState asyncNode = store.getRoot().getChildNode(":async"); /* @@ -1011,6 +1036,7 @@ public class AsyncIndexUpdate implements Runnable, Closeable { private Set<String> tempCps = new HashSet<String>(); private volatile boolean isPaused; + private volatile boolean forcedLeaseRelease; private volatile long updates; private volatile long nodesRead; private final Stopwatch watch = Stopwatch.createUnstarted(); @@ -1161,12 +1187,22 @@ public class AsyncIndexUpdate implements Runnable, Closeable { } @Override + public String releaseLeaseForPausedLane() { + if (this.isPaused()){ + this.forcedLeaseRelease = true; + return "LeaseRelease flag set"; + } + return "Please pause the lane to release lease"; + } + + @Override public void resume() { log.debug("[{}] Resuming the async indexer", name); this.isPaused = false; //Clear the forcedStop flag as fail safe forcedStopFlag.set(false); + this.forcedLeaseRelease = false; } @Override diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseRemoveTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseRemoveTest.java new file mode 100644 index 0000000..6ad55cb --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseRemoveTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.index; + +import com.google.common.collect.ImmutableSet; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider; +import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore; +import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Test; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.ASYNC; +import static org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.leasify; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class AsyncIndexUpdateLeaseRemoveTest { + private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private MetricStatisticsProvider statsProvider = + new MetricStatisticsProvider(ManagementFactory.getPlatformMBeanServer(), executor); + + @After + public void shutDown() { + statsProvider.close(); + new ExecutorCloser(executor).close(); + } + + @Test + public void releaseLeaseOnlyAfterIndexingLanePause() throws Exception { + NodeStore store = new MemoryNodeStore(); + IndexEditorProvider provider = new PropertyIndexEditorProvider(); + NodeBuilder builder = store.getRoot().builder(); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), + "rootIndex", true, false, ImmutableSet.of("foo"), null) + .setProperty(ASYNC_PROPERTY_NAME, "async"); + builder.child("testRoot").setProperty("foo", "abc"); + + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) { + @Override + protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, + String beforeCheckpoint, + AsyncIndexStats indexStats, AtomicBoolean stopFlag) { + return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, + indexStats, stopFlag) { + @Override + void close() throws CommitFailedException { + // overridden close method so that lease is not deleted on async cycle completion. + //super.close(); + } + }; + } + }; + async.setLeaseTimeOut(250); + async.run(); + Long leaseValue = getLeaseValue(store); + assertNotNull(leaseValue); + async.getIndexStats().abortAndPause(); + async.getIndexStats().releaseLeaseForPausedLane(); + async.run(); + leaseValue = getLeaseValue(store); + assertEquals(IndexStatsMBean.STATUS_DONE, async.getIndexStats().getStatus()); + assertNotNull(leaseValue); + + async.getIndexStats().resume(); + async.run(); + leaseValue = getLeaseValue(store); + assertNotNull(leaseValue); + // Not pausing indexing lane now. + // async.getIndexStats().abortAndPause(); + + // now as lane is not paused lease will not be released. + async.getIndexStats().releaseLeaseForPausedLane(); + async.run(); + leaseValue = getLeaseValue(store); + System.out.println("new lease value:" + leaseValue); + assertEquals(IndexStatsMBean.STATUS_DONE, async.getIndexStats().getStatus()); + assertTrue(leaseValue != null); + } + + private Long getLeaseValue(NodeStore store) { + NodeBuilder builder1 = store.getRoot().builder(); + NodeBuilder async1 = builder1.getChildNode(ASYNC); + return async1.getProperty(leasify("async")) == null ? null : async1.getProperty(leasify("async")).getValue(Type.LONG); + } +}