This is an automated email from the ASF dual-hosted git repository. boglesby pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new b0e0070 GEODE-3026: Removed the AsyncEventQueue for the Lucene index if the region isn't created b0e0070 is described below commit b0e0070763f42eb1744643def8af913ae41055bc Author: Barry Oglesby <bogle...@users.noreply.github.com> AuthorDate: Wed Oct 18 17:51:27 2017 -0700 GEODE-3026: Removed the AsyncEventQueue for the Lucene index if the region isn't created * GEODE-3026: Removed the AsyncEventQueue for the Lucene index if region fails to be created * GEODE-3026: Renamed RegionService to RegionCacheService * GEODE-3026: Moved RegionCacheService API to RegionListener * GEODE-3026: Updated dunit test --- .../asyncqueue/internal/AsyncEventQueueImpl.java | 6 ++ .../geode/internal/cache/GemFireCacheImpl.java | 21 +++- .../apache/geode/internal/cache/InternalCache.java | 6 ++ .../apache/geode/internal/cache/LocalRegion.java | 11 +-- .../geode/internal/cache/PartitionedRegion.java | 2 +- .../geode/internal/cache/RegionListener.java | 23 ++++- .../apache/geode/internal/cache/RegionService.java | 30 ------ .../internal/cache/xmlcache/CacheCreation.java | 15 +++ .../internal/cache/RegionListenerDUnitTest.java | 110 +++++++++++++++++++++ .../internal/cache/RegionListenerJUnitTest.java | 8 ++ .../lucene/internal/InternalLuceneService.java | 4 +- .../cache/lucene/internal/LuceneIndexImpl.java | 21 ++++ .../lucene/internal/LuceneRegionListener.java | 30 +++++- .../cache/lucene/internal/LuceneServiceImpl.java | 55 ++++++++--- .../apache/geode/cache/lucene/LuceneDUnitTest.java | 37 ++++--- .../cache/lucene/LuceneIndexCreationDUnitTest.java | 45 ++++++++- 16 files changed, 346 insertions(+), 78 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java index bf7e874..d011e28 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java @@ -220,4 +220,10 @@ public class AsyncEventQueueImpl implements AsyncEventQueue { public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { return ((AbstractGatewaySender) this.sender).waitUntilFlushed(timeout, unit); } + + @Override + public String toString() { + return new StringBuffer().append(getClass().getSimpleName()).append("{").append("id=" + getId()) + .append(",isRunning=" + this.sender.isRunning()).append("}").toString(); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index e2975ea..a399a7a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -3175,6 +3175,20 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has } @Override + public void invokeBeforeDestroyed(LocalRegion region) { + for (RegionListener listener : this.regionListeners) { + listener.beforeDestroyed(region); + } + } + + @Override + public void invokeCleanupFailedInitialization(LocalRegion region) { + for (RegionListener listener : this.regionListeners) { + listener.cleanupFailedInitialization(region); + } + } + + @Override public Region getRegion(String path) { return getRegion(path, false); } @@ -3950,7 +3964,12 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has @Override public Set<AsyncEventQueue> getAsyncEventQueues() { - return this.allVisibleAsyncEventQueues; + return getAsyncEventQueues(true); + } + + @Override + public Set<AsyncEventQueue> getAsyncEventQueues(boolean visibleOnly) { + return visibleOnly ? this.allVisibleAsyncEventQueues : this.allAsyncEventQueues; } @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java index 5ef8fcd..58ab77d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java @@ -131,6 +131,10 @@ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime { void invokeRegionAfter(LocalRegion region); + void invokeBeforeDestroyed(LocalRegion region); + + void invokeCleanupFailedInitialization(LocalRegion region); + TXManagerImpl getTXMgr(); boolean forcedDisconnect(); @@ -323,4 +327,6 @@ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime { List<InitialImageOperation.Entry> entriesToSynchronize); InternalQueryService getQueryService(); + + Set<AsyncEventQueue> getAsyncEventQueues(boolean visibleOnly); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 270c4b8..46c4e59 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -1118,6 +1118,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade @Override public void destroyRegion(Object aCallbackArgument) throws CacheWriterException, TimeoutException { + this.cache.invokeBeforeDestroyed(this); getDataView().checkSupportsRegionDestroy(); checkForLimitedOrNoAccess(); @@ -1126,14 +1127,6 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade basicDestroyRegion(event, true); } - protected void invokeBeforeRegionDestroyInServices() { - for (CacheService service : this.cache.getServices()) { - if (service instanceof RegionService) { - ((RegionService) service).beforeRegionDestroyed(this); - } - } - } - public InternalDataView getDataView() { final TXStateInterface tx = getTXState(); if (tx == null) { @@ -7020,6 +7013,8 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade } } + // Clean up region in RegionListeners + this.cache.invokeCleanupFailedInitialization(this); } finally { // make sure any waiters on initializing Latch are released this.releaseLatches(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 0a0b802..2713e2b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -7061,7 +7061,7 @@ public class PartitionedRegion extends LocalRegion @Override public void destroyRegion(Object aCallbackArgument) throws CacheWriterException, TimeoutException { - invokeBeforeRegionDestroyInServices(); + this.cache.invokeBeforeDestroyed(this); checkForColocatedChildren(); getDataView().checkSupportsRegionDestroy(); checkForLimitedOrNoAccess(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionListener.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionListener.java index 607e86a..f0b30b8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionListener.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionListener.java @@ -35,11 +35,28 @@ public interface RegionListener { * modifications. InternalRegionArguments *may* be modified, but only if you are sure the caller * is not going to reuse the InternalRegionArguments for something else. */ - public RegionAttributes beforeCreate(Region parent, String regionName, RegionAttributes attrs, - InternalRegionArguments internalRegionArgs); + default RegionAttributes beforeCreate(Region parent, String regionName, RegionAttributes attrs, + InternalRegionArguments internalRegionArgs) { + return attrs; + }; /** * Invoked after a region is created. */ - public void afterCreate(Region region); + default void afterCreate(Region region) {}; + + /** + * Invoked before a region is destroyed. This callback is currently only invoked in the initiator + * of destroyRegion. + * + * @param region The region being destroyed + */ + default void beforeDestroyed(Region region) {}; + + /** + * Invoked when a region has failed initialization. + * + * @param region The region that has failed initialization + */ + default void cleanupFailedInitialization(Region region) {}; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionService.java deleted file mode 100644 index 3d6488d..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionService.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import org.apache.geode.cache.Region; - -/** - * Interface for a service that is linked to a region. - */ -public interface RegionService extends CacheService { - - /** - * Called before a region is destroyed. - * - * @param region The region being destroyed - */ - public void beforeRegionDestroyed(Region region); -} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java index cb19ee9..2aead11 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java @@ -1248,6 +1248,11 @@ public class CacheCreation implements InternalCache { } @Override + public Set<AsyncEventQueue> getAsyncEventQueues(boolean visibleOnly) { + return this.asyncEventQueues; + } + + @Override public AsyncEventQueue getAsyncEventQueue(String id) { for (AsyncEventQueue asyncEventQueue : this.asyncEventQueues) { if (asyncEventQueue.getId().equals(id)) { @@ -2082,6 +2087,16 @@ public class CacheCreation implements InternalCache { } @Override + public void invokeBeforeDestroyed(final LocalRegion region) { + throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString()); + } + + @Override + public void invokeCleanupFailedInitialization(final LocalRegion region) { + throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString()); + } + + @Override public TXManagerImpl getTXMgr() { throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString()); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/RegionListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/RegionListenerDUnitTest.java new file mode 100644 index 0000000..1c1e821 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/RegionListenerDUnitTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.DistributedTestRule; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class RegionListenerDUnitTest implements Serializable { + + @ClassRule + public static DistributedTestRule distributedTestRule = new DistributedTestRule(); + + @Rule + public CacheRule cacheRule = CacheRule.builder().createCacheInAll().disconnectAfter().build(); + + @Test + public void testCleanupFailedInitializationInvoked() { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + + // Add RegionListener in both members + vm0.invoke(() -> installRegionListener()); + vm1.invoke(() -> installRegionListener()); + + // Create region in one member + String regionName = "testCleanupFailedInitializationInvoked"; + vm0.invoke(() -> createRegion(regionName, false)); + + // Attempt to create the region with incompatible configuration in another member. Verify that + // it throws an IllegalStateException. + vm1.invoke(() -> createRegion(regionName, IllegalStateException.class)); + + // Verify the RegionListener cleanupFailedInitialization callback was invoked + vm1.invoke(() -> verifyRegionListenerCleanupFailedInitializationInvoked()); + } + + private void installRegionListener() { + this.cacheRule.getCache().addRegionListener(new TestRegionListener()); + } + + private void createRegion(String regionName, boolean addAsyncEventQueueId) { + RegionFactory rf = this.cacheRule.getCache().createRegionFactory(RegionShortcut.REPLICATE); + if (addAsyncEventQueueId) { + rf.addAsyncEventQueueId("aeqId"); + } + rf.create(regionName); + } + + private void createRegion(String regionName, Class exception) { + RegionFactory rf = this.cacheRule.getCache().createRegionFactory(RegionShortcut.REPLICATE) + .addAsyncEventQueueId("aeqId"); + assertThatThrownBy(() -> rf.create(regionName)).isInstanceOf(exception); + } + + private void verifyRegionListenerCleanupFailedInitializationInvoked() { + Set<RegionListener> listeners = this.cacheRule.getCache().getRegionListeners(); + assertThat(listeners.size()).isEqualTo(1); + RegionListener listener = listeners.iterator().next(); + assertThat(listener).isInstanceOf(TestRegionListener.class); + TestRegionListener trl = (TestRegionListener) listener; + assertThat(trl.getCleanupFailedInitializationInvoked()).isTrue(); + } + + private static class TestRegionListener implements RegionListener { + + final AtomicBoolean cleanupFailedInitializationInvoked = new AtomicBoolean(); + + @Override + public void cleanupFailedInitialization(Region region) { + cleanupFailedInitializationInvoked.set(true); + } + + public boolean getCleanupFailedInitializationInvoked() { + return this.cleanupFailedInitializationInvoked.get(); + } + } +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/RegionListenerJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/RegionListenerJUnitTest.java index 038628e..28c456f 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/RegionListenerJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/RegionListenerJUnitTest.java @@ -31,6 +31,7 @@ public class RegionListenerJUnitTest { @Test public void test() { final AtomicBoolean afterCreateInvoked = new AtomicBoolean(); + final AtomicBoolean beforeDestroyedInvoked = new AtomicBoolean(); RegionListener listener = new RegionListener() { @Override @@ -45,6 +46,11 @@ public class RegionListenerJUnitTest { public void afterCreate(Region region) { afterCreateInvoked.set(true); } + + @Override + public void beforeDestroyed(Region region) { + beforeDestroyedInvoked.set(true); + } }; GemFireCacheImpl cache = (GemFireCacheImpl) new CacheFactory().set(MCAST_PORT, "0").create(); @@ -52,6 +58,8 @@ public class RegionListenerJUnitTest { Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create("region"); assertEquals(DataPolicy.EMPTY, region.getAttributes().getDataPolicy()); assertTrue(afterCreateInvoked.get()); + region.destroyRegion(); + assertTrue(beforeDestroyedInvoked.get()); } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneService.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneService.java index 3c27d80..9e1e244 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneService.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneService.java @@ -17,9 +17,9 @@ package org.apache.geode.cache.lucene.internal; import org.apache.geode.cache.Cache; import org.apache.geode.cache.lucene.LuceneService; -import org.apache.geode.internal.cache.RegionService; +import org.apache.geode.internal.cache.CacheService; import org.apache.geode.internal.cache.extension.Extension; -public interface InternalLuceneService extends LuceneService, Extension<Cache>, RegionService { +public interface InternalLuceneService extends LuceneService, Extension<Cache>, CacheService { } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java index a8ba555..2110ca5 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java @@ -35,6 +35,7 @@ import org.apache.geode.cache.lucene.internal.xml.LuceneIndexCreation; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.RegionListener; import org.apache.geode.internal.cache.extension.Extension; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; @@ -241,6 +242,26 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { // Close the repository manager repositoryManager.close(); + + RegionListener listenerToRemove = getRegionListener(); + if (listenerToRemove != null) { + cache.removeRegionListener(listenerToRemove); + } + + } + + private RegionListener getRegionListener() { + RegionListener rl = null; + for (RegionListener listener : cache.getRegionListeners()) { + if (listener instanceof LuceneRegionListener) { + LuceneRegionListener lrl = (LuceneRegionListener) listener; + if (lrl.getRegionPath().equals(regionPath) && lrl.getIndexName().equals(indexName)) { + rl = lrl; + break; + } + } + } + return rl; } protected <K, V> Region<K, V> createRegion(final String regionName, diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java index 70e7e0b..df9dca0 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java @@ -15,6 +15,7 @@ package org.apache.geode.cache.lucene.internal; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.analysis.Analyzer; @@ -49,6 +50,10 @@ public class LuceneRegionListener implements RegionListener { private LuceneIndexImpl luceneIndex; + private AtomicBoolean beforeCreateInvoked = new AtomicBoolean(); + + private AtomicBoolean afterCreateInvoked = new AtomicBoolean(); + public LuceneRegionListener(LuceneServiceImpl service, InternalCache cache, String indexName, String regionPath, String[] fields, Analyzer analyzer, Map<String, Analyzer> fieldAnalyzers, LuceneSerializer serializer) { @@ -76,7 +81,7 @@ public class LuceneRegionListener implements RegionListener { RegionAttributes updatedRA = attrs; String path = parent == null ? "/" + regionName : parent.getFullPath() + "/" + regionName; - if (path.equals(this.regionPath)) { + if (path.equals(this.regionPath) && this.beforeCreateInvoked.compareAndSet(false, true)) { if (!attrs.getDataPolicy().withPartitioning()) { // replicated region @@ -116,14 +121,33 @@ public class LuceneRegionListener implements RegionListener { @Override public void afterCreate(Region region) { - if (region.getFullPath().equals(this.regionPath)) { + if (region.getFullPath().equals(this.regionPath) + && this.afterCreateInvoked.compareAndSet(false, true)) { this.service.afterDataRegionCreated(this.luceneIndex); String aeqId = LuceneServiceImpl.getUniqueIndexName(this.indexName, this.regionPath); AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId); AbstractPartitionedRepositoryManager repositoryManager = (AbstractPartitionedRepositoryManager) luceneIndex.getRepositoryManager(); repositoryManager.allowRepositoryComputation(); - this.cache.removeRegionListener(this); + } + } + + @Override + public void beforeDestroyed(Region region) { + if (region.getFullPath().equals(this.regionPath)) { + this.service.beforeRegionDestroyed(region); + } + } + + @Override + public void cleanupFailedInitialization(Region region) { + // Reset the booleans + this.beforeCreateInvoked.set(false); + this.afterCreateInvoked.set(false); + + // Clean up the region in the LuceneService + if (region.getFullPath().equals(this.regionPath)) { + this.service.cleanupFailedInitialization(region); } } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java index 16bfafc..985e3ad 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java @@ -35,6 +35,7 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultCollector; @@ -123,7 +124,6 @@ public class LuceneServiceImpl implements InternalLuceneService { return InternalLuceneService.class; } - @Override public void beforeRegionDestroyed(Region region) { List<LuceneIndex> indexes = getIndexes(region.getFullPath()); if (!indexes.isEmpty()) { @@ -134,6 +134,21 @@ public class LuceneServiceImpl implements InternalLuceneService { } } + public void cleanupFailedInitialization(Region region) { + List<LuceneIndexCreationProfile> definedIndexes = getDefinedIndexes(region.getFullPath()); + for (LuceneIndexCreationProfile definedIndex : definedIndexes) { + // Get the AsyncEventQueue + String aeqId = LuceneServiceImpl.getUniqueIndexName(definedIndex.getIndexName(), + definedIndex.getRegionPath()); + AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId); + // Stop and remove the AsyncEventQueue if it exists + if (aeq != null) { + aeq.stop(); + this.cache.removeAsyncEventQueue(aeq); + } + } + } + public static String getUniqueIndexName(String indexName, String regionPath) { if (!regionPath.startsWith("/")) { regionPath = "/" + regionPath; @@ -285,6 +300,16 @@ public class LuceneServiceImpl implements InternalLuceneService { return Collections.unmodifiableList(indexes); } + public List<LuceneIndexCreationProfile> getDefinedIndexes(String regionPath) { + List<LuceneIndexCreationProfile> profiles = new ArrayList(); + for (LuceneIndexCreationProfile profile : getAllDefinedIndexes()) { + if (profile.getRegionPath().equals(regionPath)) { + profiles.add(profile); + } + } + return Collections.unmodifiableList(profiles); + } + @Override public void destroyIndex(String indexName, String regionPath) { destroyIndex(indexName, regionPath, true); @@ -312,16 +337,7 @@ public class LuceneServiceImpl implements InternalLuceneService { String uniqueIndexName = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); if (definedIndexMap.containsKey(uniqueIndexName)) { definedIndexMap.remove(uniqueIndexName); - RegionListener listenerToRemove = null; - for (RegionListener listener : cache.getRegionListeners()) { - if (listener instanceof LuceneRegionListener) { - LuceneRegionListener lrl = (LuceneRegionListener) listener; - if (lrl.getRegionPath().equals(regionPath) && lrl.getIndexName().equals(indexName)) { - listenerToRemove = lrl; - break; - } - } - } + RegionListener listenerToRemove = getRegionListener(indexName, regionPath); if (listenerToRemove != null) { cache.removeRegionListener(listenerToRemove); } @@ -334,6 +350,23 @@ public class LuceneServiceImpl implements InternalLuceneService { } } + protected RegionListener getRegionListener(String indexName, String regionPath) { + if (!regionPath.startsWith("/")) { + regionPath = "/" + regionPath; + } + RegionListener rl = null; + for (RegionListener listener : cache.getRegionListeners()) { + if (listener instanceof LuceneRegionListener) { + LuceneRegionListener lrl = (LuceneRegionListener) listener; + if (lrl.getRegionPath().equals(regionPath) && lrl.getIndexName().equals(indexName)) { + rl = lrl; + break; + } + } + } + return rl; + } + @Override public void destroyIndexes(String regionPath) { destroyIndexes(regionPath, true); diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java index aa1b084..3d5905e 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java @@ -108,12 +108,15 @@ public abstract class LuceneDUnitTest extends JUnit4CacheTestCase { EXPIRATION_TIMEOUT_SEC, ExpirationAction.DESTROY), PARTITION_REDUNDANT_PERSISTENT_WITH_EXPIRATION_DESTROY(RegionShortcut.PARTITION_PROXY_REDUNDANT, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT, EXPIRATION_TIMEOUT_SEC, - ExpirationAction.DESTROY); + ExpirationAction.DESTROY), + PARTITION_WITH_DOUBLE_BUCKETS(RegionShortcut.PARTITION_PROXY, RegionShortcut.PARTITION, null, + null, NUM_BUCKETS * 2); ExpirationAttributes expirationAttributes = null; EvictionAttributes evictionAttributes = null; private RegionShortcut serverRegionShortcut; private RegionShortcut clientRegionShortcut; + private int numBuckets; RegionTestableType(RegionShortcut clientRegionShortcut, RegionShortcut serverRegionShortcut) { this(clientRegionShortcut, serverRegionShortcut, null); @@ -121,18 +124,23 @@ public abstract class LuceneDUnitTest extends JUnit4CacheTestCase { RegionTestableType(RegionShortcut clientRegionShortcut, RegionShortcut serverRegionShortcut, EvictionAttributes evictionAttributes) { - this.clientRegionShortcut = clientRegionShortcut; - this.serverRegionShortcut = serverRegionShortcut; - this.evictionAttributes = evictionAttributes; - this.expirationAttributes = null; + this(clientRegionShortcut, serverRegionShortcut, evictionAttributes, null, NUM_BUCKETS); } RegionTestableType(RegionShortcut clientRegionShortcut, RegionShortcut serverRegionShortcut, int timeout, ExpirationAction expirationAction) { + this(clientRegionShortcut, serverRegionShortcut, null, + new ExpirationAttributes(timeout, expirationAction), NUM_BUCKETS); + } + + RegionTestableType(RegionShortcut clientRegionShortcut, RegionShortcut serverRegionShortcut, + EvictionAttributes evictionAttributes, ExpirationAttributes expirationAttributes, + int numBuckets) { this.clientRegionShortcut = clientRegionShortcut; this.serverRegionShortcut = serverRegionShortcut; - this.evictionAttributes = null; - this.expirationAttributes = new ExpirationAttributes(timeout, expirationAction); + this.evictionAttributes = evictionAttributes; + this.expirationAttributes = expirationAttributes; + this.numBuckets = numBuckets; } public Region createDataStore(Cache cache, String regionName) { @@ -147,13 +155,13 @@ public abstract class LuceneDUnitTest extends JUnit4CacheTestCase { if (expirationAttributes != null) { return cache.createRegionFactory(serverRegionShortcut) .setEntryTimeToLive(expirationAttributes) - .setPartitionAttributes(getPartitionAttributes(false)).create(regionName); + .setPartitionAttributes(getPartitionAttributes(false, numBuckets)).create(regionName); } else if (evictionAttributes == null) { return cache.createRegionFactory(serverRegionShortcut) - .setPartitionAttributes(getPartitionAttributes(false)).create(regionName); + .setPartitionAttributes(getPartitionAttributes(false, numBuckets)).create(regionName); } else { return cache.createRegionFactory(serverRegionShortcut) - .setPartitionAttributes(getPartitionAttributes(false)) + .setPartitionAttributes(getPartitionAttributes(false, numBuckets)) .setEvictionAttributes(evictionAttributes).create(regionName); } } @@ -167,23 +175,24 @@ public abstract class LuceneDUnitTest extends JUnit4CacheTestCase { } if (evictionAttributes == null) { return cache.createRegionFactory(clientRegionShortcut) - .setPartitionAttributes(getPartitionAttributes(true)).create(regionName); + .setPartitionAttributes(getPartitionAttributes(true, numBuckets)).create(regionName); } else { return cache.createRegionFactory(clientRegionShortcut) - .setPartitionAttributes(getPartitionAttributes(true)) + .setPartitionAttributes(getPartitionAttributes(true, numBuckets)) .setEvictionAttributes(evictionAttributes).create(regionName); } } } - protected static PartitionAttributes getPartitionAttributes(final boolean isAccessor) { + protected static PartitionAttributes getPartitionAttributes(final boolean isAccessor, + final int numBuckets) { PartitionAttributesFactory factory = new PartitionAttributesFactory(); if (isAccessor) { factory.setLocalMaxMemory(0); } else { factory.setLocalMaxMemory(100); } - factory.setTotalNumBuckets(NUM_BUCKETS); + factory.setTotalNumBuckets(numBuckets); return factory.create(); } diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java index afd8c96..948ba14 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java @@ -16,6 +16,7 @@ package org.apache.geode.cache.lucene; import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer; import org.apache.geode.cache.lucene.test.LuceneTestUtilities; +import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.test.dunit.SerializableRunnableIF; import org.apache.geode.test.junit.categories.DistributedTest; import org.apache.geode.util.test.TestUtil; @@ -325,7 +326,6 @@ public class LuceneIndexCreationDUnitTest extends LuceneDUnitTest { dataStore2.invoke(() -> initDataStore(createIndex2, regionType)); } - @Test @Parameters("PARTITION") public void verifyDifferentSerializerShouldFail(RegionTestableType regionType) { SerializableRunnableIF createIndex1 = getIndexWithDefaultSerializer(); @@ -347,6 +347,33 @@ public class LuceneIndexCreationDUnitTest extends LuceneDUnitTest { CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_SERIALIZER)); } + @Test + public void verifyAsyncEventQueueIsCleanedUpIfRegionCreationFails() { + SerializableRunnableIF createIndex = get2FieldsIndexes(); + + // Create a Cache in dataStore1 with a PR defining 10 buckets + dataStore1.invoke(() -> initDataStore(createIndex, RegionTestableType.PARTITION)); + + // Attempt to create a Cache in dataStore2 with a PR defining 20 buckets. This will fail. + String exceptionMessage = + LocalizedStrings.PartitionedRegion_THE_TOTAL_NUMBER_OF_BUCKETS_FOUND_IN_PARTITIONATTRIBUTES_0_IS_INCOMPATIBLE_WITH_THE_TOTAL_NUMBER_OF_BUCKETS_USED_BY_OTHER_DISTRIBUTED_MEMBERS_SET_THE_NUMBER_OF_BUCKETS_TO_1 + .toLocalizedString(new Object[] {NUM_BUCKETS * 2, NUM_BUCKETS}); + dataStore2.invoke(() -> initDataStore(createIndex, + RegionTestableType.PARTITION_WITH_DOUBLE_BUCKETS, exceptionMessage)); + + // Verify dataStore1 has two AsyncEventQueues + dataStore1.invoke(() -> verifyAsyncEventQueues(2)); + + // Verify dataStore2 has no AsyncEventQueues + dataStore2.invoke(() -> verifyAsyncEventQueues(0)); + + // Create a Cache in dataStore2 with a PR defining 10 buckets. This will succeed. + dataStore2.invoke(() -> initDataStore(RegionTestableType.PARTITION)); + + // Verify dataStore2 has two AsyncEventQueues + dataStore2.invoke(() -> verifyAsyncEventQueues(2)); + } + protected String getXmlFileForTest(String testName) { return TestUtil.getResourcePath(getClass(), getClassSimpleName() + "." + testName + ".cache.xml"); @@ -361,7 +388,7 @@ public class LuceneIndexCreationDUnitTest extends LuceneDUnitTest { createIndex.run(); try { regionType.createDataStore(getCache(), REGION_NAME); - fail("Should not have been able to create index"); + fail("Should not have been able to create region"); } catch (IllegalStateException e) { assertEquals(message, e.getMessage()); } @@ -416,14 +443,14 @@ public class LuceneIndexCreationDUnitTest extends LuceneDUnitTest { protected void verifyIndexList(final int expectedSize) { LuceneService luceneService = LuceneServiceProvider.get(getCache()); Collection<LuceneIndex> indexList = luceneService.getAllIndexes(); - assertEquals(indexList.size(), expectedSize); + assertEquals(expectedSize, indexList.size()); } protected void verifyIndexes(final int numberOfIndexes) { LuceneService luceneService = LuceneServiceProvider.get(getCache()); for (int count = 1; count <= numberOfIndexes; count++) { - assertEquals(luceneService.getIndex(INDEX_NAME + "_" + count, REGION_NAME).getName(), - INDEX_NAME + "_" + count); + assertEquals(INDEX_NAME + "_" + count, + luceneService.getIndex(INDEX_NAME + "_" + count, REGION_NAME).getName()); } } @@ -509,4 +536,12 @@ public class LuceneIndexCreationDUnitTest extends LuceneDUnitTest { .setLuceneSerializer(new HeterogeneousLuceneSerializer()).create(INDEX_NAME, REGION_NAME); }; } + + protected void initDataStore(RegionTestableType regionTestType) throws Exception { + regionTestType.createDataStore(getCache(), REGION_NAME); + } + + protected void verifyAsyncEventQueues(final int expectedSize) { + assertEquals(expectedSize, getCache().getAsyncEventQueues(false).size()); + } } -- To stop receiving notification emails like this one, please contact ['"commits@geode.apache.org" <commits@geode.apache.org>'].