Author: mreutegg Date: Wed Apr 30 16:17:09 2014 New Revision: 1591384 URL: http://svn.apache.org/r1591384 Log: OAK-1784: Async index update persists conflict markers
Modified: jackrabbit/oak/branches/1.0/ (props changed) jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Propchange: jackrabbit/oak/branches/1.0/ ------------------------------------------------------------------------------ Merged /jackrabbit/oak/trunk:r1591381 Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1591384&r1=1591383&r2=1591384&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Wed Apr 30 16:17:09 2014 @@ -41,10 +41,15 @@ import org.apache.jackrabbit.oak.api.Com import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean; +import org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler; +import org.apache.jackrabbit.oak.plugins.commit.ConflictHook; +import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider; import org.apache.jackrabbit.oak.plugins.value.Conversions; import org.apache.jackrabbit.oak.spi.commit.CommitHook; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.CompositeHook; import org.apache.jackrabbit.oak.spi.commit.EditorDiff; +import org.apache.jackrabbit.oak.spi.commit.EditorHook; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; @@ -228,7 +233,10 @@ public class AsyncIndexUpdate implements private static CommitHook newCommitHook(final String name, final PropertyState state) throws CommitFailedException { - return new CommitHook() { + return new CompositeHook( + new ConflictHook(new AnnotatingConflictHandler()), + new EditorHook(new ConflictValidatorProvider()), + new CommitHook() { @Override @Nonnull public NodeState processCommit(NodeState before, NodeState after, @@ -243,7 +251,7 @@ public class AsyncIndexUpdate implements throw CONCURRENT_UPDATE; } } - }; + }); } private static void preAsyncRun(NodeStore store, String name) throws CommitFailedException { Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1591384&r1=1591383&r2=1591384&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Wed Apr 30 16:17:09 2014 @@ -24,24 +24,32 @@ import static org.apache.jackrabbit.oak. import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider; import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup; import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore; import org.apache.jackrabbit.oak.query.index.FilterImpl; +import org.apache.jackrabbit.oak.spi.commit.CommitHook; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.query.PropertyValues; +import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; +import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; @@ -273,4 +281,76 @@ public class AsyncIndexUpdateTest { assertFalse(store.getRoot().hasChildNode("child")); } + + // OAK-1784 + @Test + public void failOnConflict() throws Exception { + final Map<Thread, Semaphore> locks = Maps.newIdentityHashMap(); + NodeStore store = new MemoryNodeStore() { + @Nonnull + @Override + public NodeState merge(@Nonnull NodeBuilder builder, + @Nonnull CommitHook commitHook, + @Nullable CommitInfo info) + throws CommitFailedException { + Semaphore s = locks.get(Thread.currentThread()); + if (s != null) { + s.acquireUninterruptibly(); + } + return super.merge(builder, commitHook, info); + } + }; + IndexEditorProvider provider = new PropertyIndexEditorProvider(); + + NodeBuilder builder = store.getRoot().builder(); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), "foo", + false, ImmutableSet.of("foo"), null, TYPE, + Collections.singletonMap(ASYNC_PROPERTY_NAME, "async")); + + builder.child("test").setProperty("foo", "a"); + + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider); + async.run(); + + builder = store.getRoot().builder(); + builder.child("test").setProperty("foo", "b"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + async.run(); + } + }); + Semaphore s = new Semaphore(0); + locks.put(t, s); + t.start(); + + while (!s.hasQueuedThreads()) { + // busy wait + } + + // introduce a conflict + builder = store.getRoot().builder(); + builder.getChildNode(INDEX_DEFINITIONS_NAME).getChildNode("foo") + .getChildNode(":index").child("a").remove(); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + s.release(100); + t.join(); + + builder = store.getRoot().builder(); + assertNoConflictMarker(builder); + } + + private void assertNoConflictMarker(NodeBuilder builder) { + for (String name : builder.getChildNodeNames()) { + if (name.equals(ConflictAnnotatingRebaseDiff.CONFLICT)) { + fail("conflict marker detected"); + } + assertNoConflictMarker(builder.getChildNode(name)); + } + } }