Author: tomekr Date: Mon Sep 10 13:30:15 2018 New Revision: 1840465 URL: http://svn.apache.org/viewvc?rev=1840465&view=rev Log: OAK-7710: CompositeNodeStore does not dispatch external events to observers
Added: jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java Modified: jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java Added: jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java?rev=1840465&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java (added) +++ jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java Mon Sep 10 13:30:15 2018 @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.composite; + +import com.google.common.collect.Lists; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider; +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; +import org.apache.jackrabbit.oak.plugins.document.DocumentStore; +import org.apache.jackrabbit.oak.plugins.document.TestNodeObserver; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider; +import org.apache.jackrabbit.oak.spi.mount.Mounts; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class CompositeNodeStoreClusterObservationTest { + + private MemoryDocumentStore ds; + private MemoryBlobStore bs; + + @Rule + public DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider(); + + private CompositeNodeStore store; + private DocumentNodeStore remote; + private DocumentNodeStore globalStore; + + private TestNodeObserver observer; + + @Before + public void initStore() { + + remote = createNodeStore(1); + globalStore = createNodeStore(2); + + MountInfoProvider mip = Mounts.newBuilder().build(); + + List<MountedNodeStore> nonDefaultStores = Lists.newArrayList(); + store = new CompositeNodeStore(mip, globalStore, nonDefaultStores); + + observer = new TestNodeObserver("/test"); + } + + @Test + public void localObserver() throws CommitFailedException { + store.addObserver(observer); + + NodeBuilder builder = store.getRoot().builder(); + builder.child("test").setProperty("foo", "bar"); + merge(store, builder); + + assertTrue("Node added event not observed for local change", observer.added.containsKey("/test")); + } + + @Test + public void remoteObserver() throws CommitFailedException { + store.addObserver(observer); + + NodeBuilder builder = remote.getRoot().builder(); + builder.child("test").setProperty("foo", "bar"); + merge(remote, builder); + + remote.runBackgroundOperations(); + globalStore.runBackgroundOperations(); + + assertTrue("Node added event not observed for local change", observer.added.containsKey("/test")); + } + + private static void merge(NodeStore store, NodeBuilder builder) + throws CommitFailedException { + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + private DocumentNodeStore createNodeStore(int clusterId) { + if (ds == null) { + ds = new MemoryDocumentStore(); + } + if (bs == null) { + bs = new MemoryBlobStore(); + } + return createNodeStore(clusterId, ds, bs); + } + + private DocumentNodeStore createNodeStore(int clusterId, + DocumentStore ds, BlobStore bs) { + return builderProvider.newBuilder().setDocumentStore(ds) + .setBlobStore(bs).setClusterId(clusterId) + .setAsyncDelay(0).build(); + } +} Modified: jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java?rev=1840465&r1=1840464&r2=1840465&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java (original) +++ jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java Mon Sep 10 13:30:15 2018 @@ -26,6 +26,7 @@ import org.apache.jackrabbit.oak.api.Com import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.composite.checks.NodeStoreChecks; +import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher; import org.apache.jackrabbit.oak.spi.commit.CommitHook; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; @@ -36,6 +37,7 @@ import org.apache.jackrabbit.oak.spi.mou import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +50,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -101,7 +102,7 @@ public class CompositeNodeStore implemen final CompositionContext ctx; - private final List<Observer> observers = new CopyOnWriteArrayList<>(); + private final ChangeDispatcher dispatcher; private final Lock mergeLock; @@ -114,6 +115,14 @@ public class CompositeNodeStore implemen this.ctx = new CompositionContext(mip, globalStore, nonDefaultStore, nodeStateMonitor, nodeBuilderMonitor); this.ignoreReadOnlyWritePaths = new TreeSet<>(ignoreReadOnlyWritePaths); this.mergeLock = new ReentrantLock(); + this.dispatcher = new ChangeDispatcher(getRoot()); + + // setup observation proxy mechanism for underlying store for events not dispatched from within our + // merge + if (globalStore instanceof Observable) { + Observable globalStoreObservable = (Observable) globalStore; + globalStoreObservable.addObserver(new MountedNodeStoreObserver()); + } } @Override @@ -167,9 +176,6 @@ public class CompositeNodeStore implemen } CompositeNodeState newRoot = ctx.createRootNodeState(resultStates); - for (Observer observer : observers) { - observer.contentChanged(newRoot, info); - } return newRoot; } finally { mergeLock.unlock(); @@ -440,14 +446,7 @@ public class CompositeNodeStore implemen @Override public Closeable addObserver(final Observer observer) { - observer.contentChanged(getRoot(), CommitInfo.EMPTY_EXTERNAL); - observers.add(observer); - return new Closeable() { - @Override - public void close() throws IOException { - observers.remove(observer); - } - }; + return dispatcher.addObserver(observer); } private Set<String> getIgnoredPaths(Set<String> paths) { @@ -543,4 +542,16 @@ public class CompositeNodeStore implemen buildMountCount, mipMountCount); } } + + private class MountedNodeStoreObserver implements Observer { + @Override + public void contentChanged(@NotNull NodeState root, @NotNull CommitInfo info) { + Map<MountedNodeStore, NodeState> nodeStates = newHashMap(); + for (MountedNodeStore nodeStore : ctx.getNonDefaultStores()) { + nodeStates.put(nodeStore, nodeStore.getNodeStore().getRoot()); + } + nodeStates.put(ctx.getGlobalStore(), root); + dispatcher.contentChanged(ctx.createRootNodeState(nodeStates), info); + } + } }