This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new d621b47 GEODE-4689: Colocation complete listeners added (#1565) d621b47 is described below commit d621b47d6d3bd9c6b285b37ca61a25e420b5fc83 Author: Nabarun Nag <nabarun...@users.noreply.github.com> AuthorDate: Wed Mar 14 10:14:17 2018 -0700 GEODE-4689: Colocation complete listeners added (#1565) * PartitionRegion can now add a colocation listener to itself which is triggered when the colocation is completed. * Lucene indexRepositoryFactory adds colocation complete listener to the file region * when the colocation of the file region is completed the compute repository is called --- .../geode/internal/cache/ColocationListener.java | 27 +++++++++++++ .../internal/cache/PartitionRegionConfig.java | 5 +-- .../geode/internal/cache/PartitionedRegion.java | 23 +++++++++-- .../cache/PartitionRegionConfigJUnitTest.java | 35 ++++++++++++++++ .../cache/PartitionedRegionCreationJUnitTest.java | 1 + .../lucene/internal/IndexRepositoryFactory.java | 25 ++++++++++-- .../LuceneFileRegionColocationListener.java | 47 ++++++++++++++++++++++ .../cache/lucene/internal/LuceneServiceImpl.java | 4 +- .../internal/PartitionedRepositoryManager.java | 2 +- .../lucene/internal/RawIndexRepositoryFactory.java | 4 +- .../internal/RawLuceneRepositoryManager.java | 2 +- .../cache/lucene/test/IndexRepositorySpy.java | 8 ++-- 12 files changed, 163 insertions(+), 20 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationListener.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationListener.java new file mode 100644 index 0000000..152c7f4 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +/* + * Callbacks that get executed when the region colocation is completed + */ +public interface ColocationListener { + /* + * execute the call back after the region colocation has completed + */ + default void afterColocationCompleted() { + + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java index 799d8d7..4669870 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java @@ -24,7 +24,6 @@ import java.util.*; import org.apache.geode.DataSerializer; import org.apache.geode.cache.EvictionAttributes; import org.apache.geode.cache.ExpirationAttributes; -import org.apache.geode.cache.FixedPartitionAttributes; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.Scope; import org.apache.geode.cache.partition.PartitionListener; @@ -32,7 +31,6 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.internal.ExternalizableDSFID; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.Version; -import org.apache.geode.internal.cache.PartitionAttributesImpl; import org.apache.geode.internal.util.Versionable; import org.apache.geode.internal.util.VersionedArrayList; @@ -343,8 +341,9 @@ public class PartitionRegionConfig extends ExternalizableDSFID implements Versio isDestroying = true; } - void setColocationComplete() { + void setColocationComplete(PartitionedRegion partitionedRegion) { this.isColocationComplete = true; + partitionedRegion.executeColocationCallbacks(); } public boolean isGreaterNodeListVersion(final PartitionRegionConfig other) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 4287f0e..a1cba79 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -14,7 +14,7 @@ */ package org.apache.geode.internal.cache; -import static org.apache.geode.internal.lang.SystemUtils.*; +import static org.apache.geode.internal.lang.SystemUtils.getLineSeparator; import java.io.IOException; import java.io.InputStream; @@ -165,7 +165,6 @@ import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceT import org.apache.geode.internal.cache.control.MemoryEvent; import org.apache.geode.internal.cache.control.MemoryThresholds; import org.apache.geode.internal.cache.eviction.EvictionController; -import org.apache.geode.internal.cache.eviction.EvictionCounters; import org.apache.geode.internal.cache.eviction.HeapEvictor; import org.apache.geode.internal.cache.execute.AbstractExecution; import org.apache.geode.internal.cache.execute.FunctionExecutionNodePruner; @@ -239,6 +238,7 @@ import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException; import org.apache.geode.internal.cache.wan.GatewaySenderException; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; +import org.apache.geode.internal.concurrent.ConcurrentHashSet; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThreadGroup; @@ -463,6 +463,19 @@ public class PartitionedRegion extends LocalRegion private final Object indexLock = new Object(); + private final Set<ColocationListener> colocationListeners = new ConcurrentHashSet<>(); + + public void addColocationListener(ColocationListener colocationListener) { + if (colocationListener != null) { + colocationListeners.add(colocationListener); + } + } + + public void removeColocationListener(ColocationListener colocationListener) { + colocationListeners.remove(colocationListener); + } + + static PRIdMap getPrIdToPR() { return prIdToPR; } @@ -1528,7 +1541,7 @@ public class PartitionedRegion extends LocalRegion this.prRoot.get(colocatedRegion.getRegionIdentifier()); if (parentConf.isColocationComplete() && parentConf.hasSameDataStoreMembers(prConfig)) { colocationComplete = true; - prConfig.setColocationComplete(); + prConfig.setColocationComplete(this); } } @@ -1546,6 +1559,10 @@ public class PartitionedRegion extends LocalRegion } } + void executeColocationCallbacks() { + colocationListeners.stream().forEach(ColocationListener::afterColocationCompleted); + } + /** * @param access true if caller wants last accessed time updated * @param allowTombstones - whether a tombstone can be returned diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigJUnitTest.java new file mode 100644 index 0000000..401fbc5 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigJUnitTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class PartitionRegionConfigJUnitTest { + @Test + public void whenSetColocationCompleteThenAllColocationListenersMustBeExecuted() { + PartitionedRegion partitionedRegion = mock(PartitionedRegion.class); + PartitionRegionConfig partitionRegionConfig = new PartitionRegionConfig(); + partitionRegionConfig.setColocationComplete(partitionedRegion); + verify(partitionedRegion, times(1)).executeColocationCallbacks(); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java index abdd811..39b0125 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java @@ -17,6 +17,7 @@ package org.apache.geode.internal.cache; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.util.ArrayList; import java.util.Iterator; diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java index f8e83be..618aa29 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java @@ -50,8 +50,8 @@ public class IndexRepositoryFactory { public IndexRepositoryFactory() {} public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer, - InternalLuceneIndex index, PartitionedRegion userRegion, final IndexRepository oldRepository) - throws IOException { + InternalLuceneIndex index, PartitionedRegion userRegion, final IndexRepository oldRepository, + PartitionedRepositoryManager partitionedRepositoryManager) throws IOException { LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index; final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion(); @@ -59,10 +59,27 @@ public class IndexRepositoryFactory { Region prRoot = PartitionedRegionHelper.getPRRoot(fileRegion.getCache()); PartitionRegionConfig prConfig = (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier()); - while (!prConfig.isColocationComplete()) { - prConfig = (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier()); + LuceneFileRegionColocationListener luceneFileRegionColocationCompleteListener = + new LuceneFileRegionColocationListener(partitionedRepositoryManager, bucketId); + fileRegion.addColocationListener(luceneFileRegionColocationCompleteListener); + IndexRepository repo = null; + if (prConfig.isColocationComplete()) { + repo = finishComputingRepository(bucketId, serializer, userRegion, oldRepository, index); } + return repo; + } + /* + * NOTE: The method finishComputingRepository must be called through computeIndexRepository. + * Executing finishComputingRepository outside of computeIndexRepository may result in race + * conditions. + * This is a util function just to not let computeIndexRepository be a huge chunk of code. + */ + private IndexRepository finishComputingRepository(Integer bucketId, LuceneSerializer serializer, + PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index) + throws IOException { + LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index; + final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion(); BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId); BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId); boolean success = false; diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneFileRegionColocationListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneFileRegionColocationListener.java new file mode 100644 index 0000000..7298283 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneFileRegionColocationListener.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal; + +import org.apache.geode.internal.cache.ColocationListener; + +public class LuceneFileRegionColocationListener implements ColocationListener { + private final PartitionedRepositoryManager partitionedRepositoryManager; + private final Integer bucketID; + + public LuceneFileRegionColocationListener( + PartitionedRepositoryManager partitionedRepositoryManager, Integer bucketID) { + this.partitionedRepositoryManager = partitionedRepositoryManager; + this.bucketID = bucketID; + } + + + @Override + public void afterColocationCompleted() { + this.partitionedRepositoryManager.computeRepository(this.bucketID); + } + + // Current implementation will allow only one LuceneFileRegionColocationListener to be + // added to the PartitionRegion colocationListener set. + @Override + public int hashCode() { + return bucketID.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return (obj instanceof LuceneFileRegionColocationListener + && ((LuceneFileRegionColocationListener) obj).bucketID == this.bucketID); + } +} diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java index b2c2412..5d0ea48 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java @@ -242,11 +242,11 @@ public class LuceneServiceImpl implements InternalLuceneService { protected boolean createLuceneIndexOnDataRegion(final PartitionedRegion userRegion, final InternalLuceneIndex luceneIndex) { try { - PartitionedRepositoryManager repositoryManager = - (PartitionedRepositoryManager) luceneIndex.getRepositoryManager(); if (userRegion.getDataStore() == null) { return true; } + PartitionedRepositoryManager repositoryManager = + (PartitionedRepositoryManager) luceneIndex.getRepositoryManager(); Set<Integer> primaryBucketIds = userRegion.getDataStore().getAllLocalPrimaryBucketIds(); Iterator primaryBucketIterator = primaryBucketIds.iterator(); while (primaryBucketIterator.hasNext()) { diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java index 9e90e95..f60f83b 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java @@ -117,7 +117,7 @@ public class PartitionedRepositoryManager implements RepositoryManager { InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository) throws IOException { return indexRepositoryFactory.computeIndexRepository(bucketId, serializer, index, userRegion, - oldRepository); + oldRepository, this); } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java index 8f8e09e..984d3eb 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java @@ -34,8 +34,8 @@ public class RawIndexRepositoryFactory extends IndexRepositoryFactory { @Override public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer, - InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository) - throws IOException { + InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository, + PartitionedRepositoryManager partitionedRepositoryManager) throws IOException { final IndexRepository repo; if (oldRepository != null) { oldRepository.cleanup(); diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java index f47e11e..0b38c45 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java @@ -51,6 +51,6 @@ public class RawLuceneRepositoryManager extends PartitionedRepositoryManager { InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository) throws IOException { return indexRepositoryFactory.computeIndexRepository(bucketId, serializer, index, userRegion, - oldRepository); + oldRepository, this); } } diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java index d80ca63..37a1e6d 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java @@ -50,10 +50,10 @@ public class IndexRepositorySpy extends IndexRepositoryFactory { @Override public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer, - InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository) - throws IOException { - final IndexRepository indexRepo = - super.computeIndexRepository(bucketId, serializer, index, userRegion, oldRepository); + InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository, + PartitionedRepositoryManager partitionedRepositoryManager) throws IOException { + final IndexRepository indexRepo = super.computeIndexRepository(bucketId, serializer, index, + userRegion, oldRepository, partitionedRepositoryManager); if (indexRepo == null) { return null; } -- To stop receiving notification emails like this one, please contact n...@apache.org.