This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new d621b47  GEODE-4689: Colocation complete listeners added (#1565)
d621b47 is described below

commit d621b47d6d3bd9c6b285b37ca61a25e420b5fc83
Author: Nabarun Nag <nabarun...@users.noreply.github.com>
AuthorDate: Wed Mar 14 10:14:17 2018 -0700

    GEODE-4689: Colocation complete listeners added (#1565)
    
            * PartitionRegion can now add a colocation listener to itself which 
is triggered when the colocation is completed.
        * Lucene indexRepositoryFactory adds colocation complete listener to 
the file region
        * when the colocation of the file region is completed the compute 
repository is called
---
 .../geode/internal/cache/ColocationListener.java   | 27 +++++++++++++
 .../internal/cache/PartitionRegionConfig.java      |  5 +--
 .../geode/internal/cache/PartitionedRegion.java    | 23 +++++++++--
 .../cache/PartitionRegionConfigJUnitTest.java      | 35 ++++++++++++++++
 .../cache/PartitionedRegionCreationJUnitTest.java  |  1 +
 .../lucene/internal/IndexRepositoryFactory.java    | 25 ++++++++++--
 .../LuceneFileRegionColocationListener.java        | 47 ++++++++++++++++++++++
 .../cache/lucene/internal/LuceneServiceImpl.java   |  4 +-
 .../internal/PartitionedRepositoryManager.java     |  2 +-
 .../lucene/internal/RawIndexRepositoryFactory.java |  4 +-
 .../internal/RawLuceneRepositoryManager.java       |  2 +-
 .../cache/lucene/test/IndexRepositorySpy.java      |  8 ++--
 12 files changed, 163 insertions(+), 20 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationListener.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationListener.java
new file mode 100644
index 0000000..152c7f4
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+/*
+ * Callbacks that get executed when the region colocation is completed
+ */
+public interface ColocationListener {
+  /*
+   * execute the call back after the region colocation has completed
+   */
+  default void afterColocationCompleted() {
+
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java
index 799d8d7..4669870 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java
@@ -24,7 +24,6 @@ import java.util.*;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.ExpirationAttributes;
-import org.apache.geode.cache.FixedPartitionAttributes;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.partition.PartitionListener;
@@ -32,7 +31,6 @@ import 
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.ExternalizableDSFID;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.PartitionAttributesImpl;
 import org.apache.geode.internal.util.Versionable;
 import org.apache.geode.internal.util.VersionedArrayList;
 
@@ -343,8 +341,9 @@ public class PartitionRegionConfig extends 
ExternalizableDSFID implements Versio
     isDestroying = true;
   }
 
-  void setColocationComplete() {
+  void setColocationComplete(PartitionedRegion partitionedRegion) {
     this.isColocationComplete = true;
+    partitionedRegion.executeColocationCallbacks();
   }
 
   public boolean isGreaterNodeListVersion(final PartitionRegionConfig other) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 4287f0e..a1cba79 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -14,7 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
-import static org.apache.geode.internal.lang.SystemUtils.*;
+import static org.apache.geode.internal.lang.SystemUtils.getLineSeparator;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -165,7 +165,6 @@ import 
org.apache.geode.internal.cache.control.InternalResourceManager.ResourceT
 import org.apache.geode.internal.cache.control.MemoryEvent;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.cache.eviction.EvictionController;
-import org.apache.geode.internal.cache.eviction.EvictionCounters;
 import org.apache.geode.internal.cache.eviction.HeapEvictor;
 import org.apache.geode.internal.cache.execute.AbstractExecution;
 import org.apache.geode.internal.cache.execute.FunctionExecutionNodePruner;
@@ -239,6 +238,7 @@ import 
org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
 import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+import org.apache.geode.internal.concurrent.ConcurrentHashSet;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -463,6 +463,19 @@ public class PartitionedRegion extends LocalRegion
 
   private final Object indexLock = new Object();
 
+  private final Set<ColocationListener> colocationListeners = new 
ConcurrentHashSet<>();
+
+  public void addColocationListener(ColocationListener colocationListener) {
+    if (colocationListener != null) {
+      colocationListeners.add(colocationListener);
+    }
+  }
+
+  public void removeColocationListener(ColocationListener colocationListener) {
+    colocationListeners.remove(colocationListener);
+  }
+
+
   static PRIdMap getPrIdToPR() {
     return prIdToPR;
   }
@@ -1528,7 +1541,7 @@ public class PartitionedRegion extends LocalRegion
             this.prRoot.get(colocatedRegion.getRegionIdentifier());
         if (parentConf.isColocationComplete() && 
parentConf.hasSameDataStoreMembers(prConfig)) {
           colocationComplete = true;
-          prConfig.setColocationComplete();
+          prConfig.setColocationComplete(this);
         }
       }
 
@@ -1546,6 +1559,10 @@ public class PartitionedRegion extends LocalRegion
     }
   }
 
+  void executeColocationCallbacks() {
+    
colocationListeners.stream().forEach(ColocationListener::afterColocationCompleted);
+  }
+
   /**
    * @param access true if caller wants last accessed time updated
    * @param allowTombstones - whether a tombstone can be returned
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigJUnitTest.java
new file mode 100644
index 0000000..401fbc5
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigJUnitTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class PartitionRegionConfigJUnitTest {
+  @Test
+  public void 
whenSetColocationCompleteThenAllColocationListenersMustBeExecuted() {
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    PartitionRegionConfig partitionRegionConfig = new PartitionRegionConfig();
+    partitionRegionConfig.setColocationComplete(partitionedRegion);
+    verify(partitionedRegion, times(1)).executeColocationCallbacks();
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java
index abdd811..39b0125 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionCreationJUnitTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.Iterator;
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index f8e83be..618aa29 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -50,8 +50,8 @@ public class IndexRepositoryFactory {
   public IndexRepositoryFactory() {}
 
   public IndexRepository computeIndexRepository(final Integer bucketId, 
LuceneSerializer serializer,
-      InternalLuceneIndex index, PartitionedRegion userRegion, final 
IndexRepository oldRepository)
-      throws IOException {
+      InternalLuceneIndex index, PartitionedRegion userRegion, final 
IndexRepository oldRepository,
+      PartitionedRepositoryManager partitionedRepositoryManager) throws 
IOException {
     LuceneIndexForPartitionedRegion indexForPR = 
(LuceneIndexForPartitionedRegion) index;
     final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
 
@@ -59,10 +59,27 @@ public class IndexRepositoryFactory {
     Region prRoot = PartitionedRegionHelper.getPRRoot(fileRegion.getCache());
     PartitionRegionConfig prConfig =
         (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
-    while (!prConfig.isColocationComplete()) {
-      prConfig = (PartitionRegionConfig) 
prRoot.get(fileRegion.getRegionIdentifier());
+    LuceneFileRegionColocationListener 
luceneFileRegionColocationCompleteListener =
+        new LuceneFileRegionColocationListener(partitionedRepositoryManager, 
bucketId);
+    
fileRegion.addColocationListener(luceneFileRegionColocationCompleteListener);
+    IndexRepository repo = null;
+    if (prConfig.isColocationComplete()) {
+      repo = finishComputingRepository(bucketId, serializer, userRegion, 
oldRepository, index);
     }
+    return repo;
+  }
 
+  /*
+   * NOTE: The method finishComputingRepository must be called through 
computeIndexRepository.
+   * Executing finishComputingRepository outside of computeIndexRepository may 
result in race
+   * conditions.
+   * This is a util function just to not let computeIndexRepository be a huge 
chunk of code.
+   */
+  private IndexRepository finishComputingRepository(Integer bucketId, 
LuceneSerializer serializer,
+      PartitionedRegion userRegion, IndexRepository oldRepository, 
InternalLuceneIndex index)
+      throws IOException {
+    LuceneIndexForPartitionedRegion indexForPR = 
(LuceneIndexForPartitionedRegion) index;
+    final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
     BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
     BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
     boolean success = false;
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneFileRegionColocationListener.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneFileRegionColocationListener.java
new file mode 100644
index 0000000..7298283
--- /dev/null
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneFileRegionColocationListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal;
+
+import org.apache.geode.internal.cache.ColocationListener;
+
+public class LuceneFileRegionColocationListener implements ColocationListener {
+  private final PartitionedRepositoryManager partitionedRepositoryManager;
+  private final Integer bucketID;
+
+  public LuceneFileRegionColocationListener(
+      PartitionedRepositoryManager partitionedRepositoryManager, Integer 
bucketID) {
+    this.partitionedRepositoryManager = partitionedRepositoryManager;
+    this.bucketID = bucketID;
+  }
+
+
+  @Override
+  public void afterColocationCompleted() {
+    this.partitionedRepositoryManager.computeRepository(this.bucketID);
+  }
+
+  // Current implementation will allow only one 
LuceneFileRegionColocationListener to be
+  // added to the PartitionRegion colocationListener set.
+  @Override
+  public int hashCode() {
+    return bucketID.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return (obj instanceof LuceneFileRegionColocationListener
+        && ((LuceneFileRegionColocationListener) obj).bucketID == 
this.bucketID);
+  }
+}
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index b2c2412..5d0ea48 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -242,11 +242,11 @@ public class LuceneServiceImpl implements 
InternalLuceneService {
   protected boolean createLuceneIndexOnDataRegion(final PartitionedRegion 
userRegion,
       final InternalLuceneIndex luceneIndex) {
     try {
-      PartitionedRepositoryManager repositoryManager =
-          (PartitionedRepositoryManager) luceneIndex.getRepositoryManager();
       if (userRegion.getDataStore() == null) {
         return true;
       }
+      PartitionedRepositoryManager repositoryManager =
+          (PartitionedRepositoryManager) luceneIndex.getRepositoryManager();
       Set<Integer> primaryBucketIds = 
userRegion.getDataStore().getAllLocalPrimaryBucketIds();
       Iterator primaryBucketIterator = primaryBucketIds.iterator();
       while (primaryBucketIterator.hasNext()) {
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
index 9e90e95..f60f83b 100755
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -117,7 +117,7 @@ public class PartitionedRepositoryManager implements 
RepositoryManager {
       InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository 
oldRepository)
       throws IOException {
     return indexRepositoryFactory.computeIndexRepository(bucketId, serializer, 
index, userRegion,
-        oldRepository);
+        oldRepository, this);
   }
 
 
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
index 8f8e09e..984d3eb 100755
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
@@ -34,8 +34,8 @@ public class RawIndexRepositoryFactory extends 
IndexRepositoryFactory {
 
   @Override
   public IndexRepository computeIndexRepository(final Integer bucketId, 
LuceneSerializer serializer,
-      InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository 
oldRepository)
-      throws IOException {
+      InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository 
oldRepository,
+      PartitionedRepositoryManager partitionedRepositoryManager) throws 
IOException {
     final IndexRepository repo;
     if (oldRepository != null) {
       oldRepository.cleanup();
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
index f47e11e..0b38c45 100755
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
@@ -51,6 +51,6 @@ public class RawLuceneRepositoryManager extends 
PartitionedRepositoryManager {
       InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository 
oldRepository)
       throws IOException {
     return indexRepositoryFactory.computeIndexRepository(bucketId, serializer, 
index, userRegion,
-        oldRepository);
+        oldRepository, this);
   }
 }
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java
index d80ca63..37a1e6d 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/IndexRepositorySpy.java
@@ -50,10 +50,10 @@ public class IndexRepositorySpy extends 
IndexRepositoryFactory {
 
   @Override
   public IndexRepository computeIndexRepository(final Integer bucketId, 
LuceneSerializer serializer,
-      InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository 
oldRepository)
-      throws IOException {
-    final IndexRepository indexRepo =
-        super.computeIndexRepository(bucketId, serializer, index, userRegion, 
oldRepository);
+      InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository 
oldRepository,
+      PartitionedRepositoryManager partitionedRepositoryManager) throws 
IOException {
+    final IndexRepository indexRepo = super.computeIndexRepository(bucketId, 
serializer, index,
+        userRegion, oldRepository, partitionedRepositoryManager);
     if (indexRepo == null) {
       return null;
     }

-- 
To stop receiving notification emails like this one, please contact
n...@apache.org.

Reply via email to