Repository: geode Updated Branches: refs/heads/develop cfaa0e795 -> 38cf13ffb
GEODE-2553: Closed IndexRepositories when deleting an index Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/38cf13ff Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/38cf13ff Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/38cf13ff Branch: refs/heads/develop Commit: 38cf13ffbbc912f78bdacb1caf835508abe1b5e3 Parents: cfaa0e7 Author: Barry Oglesby <[email protected]> Authored: Wed Mar 1 10:41:03 2017 -0800 Committer: Barry Oglesby <[email protected]> Committed: Wed Mar 8 14:30:14 2017 -0800 ---------------------------------------------------------------------- .../AbstractPartitionedRepositoryManager.java | 16 +++++- .../lucene/internal/LuceneBucketListener.java | 7 +-- .../LuceneIndexForPartitionedRegion.java | 31 ++--------- .../cache/lucene/internal/LuceneIndexImpl.java | 36 ++++++++++--- .../internal/RawLuceneRepositoryManager.java | 6 --- .../internal/repository/RepositoryManager.java | 5 ++ .../lucene/LuceneIndexDestroyDUnitTest.java | 56 +++++++++++++++++++- 7 files changed, 107 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java index 97acea1..26179c7 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java @@ -49,11 +49,13 @@ public abstract class AbstractPartitionedRepositoryManager implements Repository protected final PartitionedRegion userRegion; protected final LuceneSerializer serializer; protected final LuceneIndexImpl index; + protected volatile boolean closed; public AbstractPartitionedRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer) { this.index = index; this.userRegion = (PartitionedRegion) index.getCache().getRegion(index.getRegionPath()); this.serializer = serializer; + this.closed = false; } @Override @@ -91,9 +93,13 @@ public abstract class AbstractPartitionedRepositoryManager implements Repository LuceneSerializer serializer, LuceneIndexImpl index, PartitionedRegion userRegion, IndexRepository oldRepository) throws IOException; - protected IndexRepository computeRepository(Integer bucketId) throws BucketNotFoundException { + protected IndexRepository computeRepository(Integer bucketId) { IndexRepository repo = indexRepositories.compute(bucketId, (key, oldRepository) -> { try { + if (closed && oldRepository != null) { + oldRepository.cleanup(); + return null; + } return computeRepository(bucketId, serializer, index, userRegion, oldRepository); } catch (IOException e) { throw new InternalGemFireError("Unable to create index repository", e); @@ -119,4 +125,12 @@ public abstract class AbstractPartitionedRepositoryManager implements Repository } return repo; } + + @Override + public void close() { + this.closed = true; + for (Integer bucketId : indexRepositories.keySet()) { + computeRepository(bucketId); + } + } } http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java index 9532249..32fb3fc 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java @@ -40,11 +40,6 @@ public class LuceneBucketListener extends PartitionListenerAdapter { lucenePartitionRepositoryManager.computeRepository(bucketId); } catch (PrimaryBucketException e) { logger.info("Index repository could not be created because we are no longer primary?", e); - } catch (BucketNotFoundException e) { - logger.info( - "Index repository could not be created when index chunk region bucket became primary. " - + "Deferring index repository to be created lazily during lucene query execution." - + e); } }); } @@ -57,7 +52,7 @@ public class LuceneBucketListener extends PartitionListenerAdapter { dm.getWaitingThreadPool().execute(() -> { try { lucenePartitionRepositoryManager.computeRepository(bucketId); - } catch (PrimaryBucketException | BucketNotFoundException | AlreadyClosedException e) { + } catch (PrimaryBucketException | AlreadyClosedException e) { logger.debug("Exception while cleaning up Lucene Index Repository", e); } }); http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index 4aa24b5..f24c6d6 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -217,10 +217,6 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { // Invoke super destroy to remove the extension super.destroy(initiator); - // Destroy the AsyncEventQueue - PartitionedRegion pr = (PartitionedRegion) getDataRegion(); - destroyAsyncEventQueue(pr); - // Destroy the chunk region (colocated with the file region) // localDestroyRegion can't be used because locally destroying regions is not supported on // colocated regions @@ -243,7 +239,7 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { // Destroy index on remote members if necessary if (initiator) { - destroyOnRemoteMembers(pr); + destroyOnRemoteMembers(); } if (logger.isDebugEnabled()) { @@ -252,29 +248,8 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { } } - private void destroyAsyncEventQueue(PartitionedRegion pr) { - String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); - - // Get the AsyncEventQueue - AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId); - - // Stop the AsyncEventQueue (this stops the AsyncEventQueue's underlying GatewaySender) - aeq.stop(); - - // Remove the id from the dataRegion's AsyncEventQueue ids - // Note: The region may already have been destroyed by a remote member - if (!pr.isDestroyed()) { - pr.getAttributesMutator().removeAsyncEventQueueId(aeqId); - } - - // Destroy the aeq (this also removes it from the GemFireCacheImpl) - aeq.destroy(); - if (logger.isDebugEnabled()) { - logger.debug("Destroyed aeqId=" + aeqId); - } - } - - private void destroyOnRemoteMembers(PartitionedRegion pr) { + private void destroyOnRemoteMembers() { + PartitionedRegion pr = (PartitionedRegion) getDataRegion(); DM dm = pr.getDistributionManager(); Set<InternalDistributedMember> recipients = pr.getRegionAdvisor().adviseDataStore(); if (!recipients.isEmpty()) { http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java index cf519be..b5b13c1 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java @@ -16,9 +16,7 @@ package org.apache.geode.cache.lucene.internal; import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.geode.internal.cache.extension.Extension; import org.apache.logging.log4j.Logger; @@ -33,11 +31,6 @@ import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; -import org.apache.geode.cache.execute.Execution; -import org.apache.geode.cache.execute.FunctionService; -import org.apache.geode.cache.execute.ResultCollector; -import org.apache.geode.cache.lucene.internal.distributed.WaitUntilFlushedFunction; -import org.apache.geode.cache.lucene.internal.distributed.WaitUntilFlushedFunctionContext; import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.cache.lucene.internal.xml.LuceneIndexCreation; import org.apache.geode.internal.cache.GemFireCacheImpl; @@ -217,6 +210,12 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { if (extensionToDelete != null) { getDataRegion().getExtensionPoint().removeExtension(extensionToDelete); } + + // Destroy the async event queue + destroyAsyncEventQueue(); + + // Close the repository manager + repositoryManager.close(); } protected <K, V> Region<K, V> createRegion(final String regionName, @@ -237,4 +236,27 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { throw ige; } } + + private void destroyAsyncEventQueue() { + String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); + + // Get the AsyncEventQueue + AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId); + + // Stop the AsyncEventQueue (this stops the AsyncEventQueue's underlying GatewaySender) + aeq.stop(); + + // Remove the id from the dataRegion's AsyncEventQueue ids + // Note: The region may already have been destroyed by a remote member + Region region = getDataRegion(); + if (!region.isDestroyed()) { + region.getAttributesMutator().removeAsyncEventQueueId(aeqId); + } + + // Destroy the aeq (this also removes it from the GemFireCacheImpl) + aeq.destroy(); + if (logger.isDebugEnabled()) { + logger.debug("Destroyed aeqId=" + aeqId); + } + } } http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java index c31f19c..b9f4de8 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java @@ -28,12 +28,6 @@ public class RawLuceneRepositoryManager extends AbstractPartitionedRepositoryMan super(index, serializer); } - public void close() { - for (IndexRepository repo : indexRepositories.values()) { - repo.cleanup(); - } - } - @Override protected IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException { IndexRepository repo = indexRepositories.get(bucketId); http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java index b569d70..7f4a6c6 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java @@ -39,4 +39,9 @@ public interface RepositoryManager { */ Collection<IndexRepository> getRepositories(RegionFunctionContext context) throws BucketNotFoundException; + + /** + * Closes this {@link RepositoryManager} + */ + void close(); } http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java index 6260075..1afde6a 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java @@ -23,6 +23,7 @@ import org.apache.geode.cache.lucene.test.TestObject; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.SerializableRunnableIF; +import org.apache.geode.test.dunit.ThreadUtils; import org.apache.geode.test.junit.categories.DistributedTest; import org.awaitility.Awaitility; import org.junit.Ignore; @@ -30,6 +31,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.io.InterruptedIOException; import java.util.concurrent.TimeUnit; import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.INDEX_NAME; @@ -38,6 +40,7 @@ import static org.apache.geode.internal.Assert.fail; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; @Category(DistributedTest.class) @RunWith(JUnitParamsRunner.class) @@ -124,7 +127,7 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { dataStore2.invoke(() -> verifyIndexCreated()); // Start puts - AsyncInvocation putter = dataStore1.invokeAsync(() -> doPuts()); + AsyncInvocation putter = dataStore1.invokeAsync(() -> doPutsUntilStopped()); // Wait until puts have started dataStore1.invoke(() -> waitUntilPutsHaveStarted()); @@ -141,6 +144,42 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { putter.join(); } + @Test + @Parameters(method = "getListOfRegionTestTypes") + public void verifyDestroyRecreateSingleIndex(RegionTestableType regionType) { + // Create index and region + dataStore1.invoke(() -> initDataStore(createIndex(), regionType)); + dataStore2.invoke(() -> initDataStore(createIndex(), regionType)); + + // Verify index created + dataStore1.invoke(() -> verifyIndexCreated()); + dataStore2.invoke(() -> verifyIndexCreated()); + + // Do puts to cause IndexRepositories to be created + dataStore1.invoke(() -> doPuts(10)); + + // Destroy indexes (only needs to be done on one member) + dataStore1.invoke(() -> destroyIndexes()); + + // Verify indexes destroyed + dataStore1.invoke(() -> verifyIndexesDestroyed()); + dataStore2.invoke(() -> verifyIndexesDestroyed()); + + // Destroy data region + dataStore1.invoke(() -> destroyDataRegion(true)); + + // Recreate index and region + dataStore1.invoke(() -> initDataStore(createIndex(), regionType)); + dataStore2.invoke(() -> initDataStore(createIndex(), regionType)); + + // Do puts to cause IndexRepositories to be recreated + dataStore1.invoke(() -> doPuts(10)); + + // Wait until queue is flushed + // This verifies there are no deadlocks + dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME)); + dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME)); + } private SerializableRunnableIF createIndex() { return () -> { @@ -168,7 +207,20 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { assertNotNull(luceneService.getIndex(INDEX_NAME + "1", REGION_NAME)); } - private void doPuts() throws Exception { + private void waitUntilFlushed(String indexName) throws Exception { + LuceneService luceneService = LuceneServiceProvider.get(getCache()); + assertTrue( + luceneService.waitUntilFlushed(indexName, REGION_NAME, 30000, TimeUnit.MILLISECONDS)); + } + + private void doPuts(int numPuts) throws Exception { + Region region = getCache().getRegion(REGION_NAME); + for (int i = 0; i < numPuts; i++) { + region.put(i, new TestObject()); + } + } + + private void doPutsUntilStopped() throws Exception { Region region = getCache().getRegion(REGION_NAME); int i = 0; while (!STOP_PUTS) {
