This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f7ddf8  Relocation manager should relocate segments from any servers 
(irrespective of the tag) to completed servers (#3466)
2f7ddf8 is described below

commit 2f7ddf840492d4d28b04ddad604888e5d7449b47
Author: Neha Pawar <[email protected]>
AuthorDate: Tue Nov 13 10:40:00 2018 -0800

    Relocation manager should relocate segments from any servers (irrespective 
of the tag) to completed servers (#3466)
---
 .../core/relocation/RealtimeSegmentRelocator.java  | 50 +++++++++----------
 .../relocation/RealtimeSegmentRelocatorTest.java   | 58 +++++++++++-----------
 2 files changed, 52 insertions(+), 56 deletions(-)

diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index f71a314..0954f5b 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -42,12 +42,12 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Manager to relocate completed segments from consuming servers to completed 
servers
+ * Manager to relocate completed segments to completed servers
  * Segment relocation will be done by this manager, instead of directly moving 
segments to completed servers on completion,
  * so that we don't get segment downtime when a segment is in transition
  *
  * We only relocate segments for realtime tables, and only if tenant config 
indicates that relocation is required
- * A segment will be relocated, one replica at a time, once all of its 
replicas are in ONLINE state and on consuming servers
+ * A segment will be relocated, one replica at a time, once all of its 
replicas are in ONLINE state and all/some are on servers other than completed 
servers
  */
 public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
@@ -107,7 +107,7 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask {
 
   /**
    * Given a realtime tag config and an ideal state, relocate the segments
-   * which are completed but still hanging around on consuming servers, one 
replica at a time
+   * which are completed but not yet moved to completed servers, one replica 
at a time
    * @param  realtimeTagConfig
    * @param idealState
    */
@@ -115,12 +115,7 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask {
 
     final HelixManager helixManager = 
_pinotHelixResourceManager.getHelixZkManager();
 
-    List<String> consumingServers = getInstancesWithTag(helixManager, 
realtimeTagConfig.getConsumingServerTag());
-    if (consumingServers.isEmpty()) {
-      throw new IllegalStateException(
-          "Found no realtime consuming servers with tag " + 
realtimeTagConfig.getConsumingServerTag());
-    }
-    List<String> completedServers = getInstancesWithTag(helixManager, 
realtimeTagConfig.getCompletedServerTag());
+    final List<String> completedServers = getInstancesWithTag(helixManager, 
realtimeTagConfig.getCompletedServerTag());
     if (completedServers.isEmpty()) {
       throw new IllegalStateException(
           "Found no realtime completed servers with tag " + 
realtimeTagConfig.getCompletedServerTag());
@@ -152,7 +147,7 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask {
     completedServersQueue.addAll(completedServerToNumSegments.entrySet());
 
     // get new mapping for segments that need relocation
-    createNewIdealState(realtimeTagConfig, idealState, consumingServers, 
completedServersQueue);
+    createNewIdealState(realtimeTagConfig, idealState, completedServers, 
completedServersQueue);
   }
 
   @VisibleForTesting
@@ -161,26 +156,26 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask {
   }
 
   /**
-   * Given an ideal state, list of consuming serves and completed servers,
-   * create a mapping for those segments that should relocate a replica from 
consuming to completed server
+   * Given an ideal state find the segments that need to relocate a replica to 
completed servers,
+   * and create a new instance state map for those segments
    *
    * @param realtimeTagConfig
    * @param idealState
-   * @param consumingServers
+   * @param completedServers
    * @param completedServersQueue
    * @return
    */
-  private void createNewIdealState(RealtimeTagConfig realtimeTagConfig, 
IdealState idealState,
-      List<String> consumingServers, MinMaxPriorityQueue<Map.Entry<String, 
Integer>> completedServersQueue) {
+  private void createNewIdealState(final RealtimeTagConfig realtimeTagConfig, 
IdealState idealState,
+      final List<String> completedServers, 
MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
 
     // TODO: we are scanning the entire segments list every time. This is 
unnecessary because only the latest segments will need relocation
     // Can we do something to avoid this?
     // 1. Time boundary: scan only last day whereas runFrequency = hourly
     // 2. For each partition, scan in descending order, and stop when the 
first segment not needing relocation is found
     for (String segmentName : idealState.getPartitionSet()) {
-      Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(segmentName);
+      final Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(segmentName);
       Map<String, String> newInstanceStateMap =
-          createNewInstanceStateMap(realtimeTagConfig, segmentName, 
instanceStateMap, consumingServers,
+          createNewInstanceStateMap(realtimeTagConfig, segmentName, 
instanceStateMap, completedServers,
               completedServersQueue);
       if (MapUtils.isNotEmpty(newInstanceStateMap)) {
         idealState.setInstanceStateMap(segmentName, newInstanceStateMap);
@@ -189,30 +184,32 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask {
   }
 
   /**
-   * Given the instanceStateMap and a list of consuming and completed servers 
for a realtime resource,
-   * creates a new instanceStateMap, where one replica's instance is replaced 
from a consuming server to a completed server
+   * Given the instance state map of a segment, relocate one replica to a 
completed server if needed
+   * Relocation should be done only if all replicas are ONLINE and at least 
one replica is not on the completed servers
    *
    * @param realtimeTagConfig
    * @param instanceStateMap
-   * @param consumingServers
+   * @param completedServers
    * @param completedServersQueue
    * @return
    */
-  private Map<String, String> createNewInstanceStateMap(RealtimeTagConfig 
realtimeTagConfig, String segmentName,
-      Map<String, String> instanceStateMap, List<String> consumingServers,
+  private Map<String, String> createNewInstanceStateMap(final 
RealtimeTagConfig realtimeTagConfig,
+      final String segmentName, final Map<String, String> instanceStateMap, 
final List<String> completedServers,
       MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
 
     Map<String, String> newInstanceStateMap = null;
 
-    // proceed only if all segments are ONLINE, and at least 1 server is from 
consuming list
+    // proceed only if all segments are ONLINE
     for (String state : instanceStateMap.values()) {
       if 
(!state.equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.ONLINE_STATE)) 
{
         return newInstanceStateMap;
       }
     }
 
+    // if an instance is found in the instance state map which is not a 
completed server,
+    // replace the instance with one from the completed servers queue
     for (String instance : instanceStateMap.keySet()) {
-      if (consumingServers.contains(instance)) {
+      if (!completedServers.contains(instance)) {
         // Decide best strategy to pick completed server.
         // 1. pick random from list of completed servers
         // 2. pick completed server with minimum segments, based on ideal 
state of this resource
@@ -245,9 +242,8 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask {
 
         chosenServer.setValue(chosenServer.getValue() + 1);
         completedServersQueue.add(chosenServer);
-        LOGGER.info("Relocating segment {} from consuming server {} (tag {}) 
to completed server {} (tag {})",
-            segmentName, instance, realtimeTagConfig.getConsumingServerTag(), 
chosenServer,
-            realtimeTagConfig.getCompletedServerTag());
+        LOGGER.info("Relocating segment {} from server {} to completed server 
{} (tag {})", segmentName, instance,
+            chosenServer, realtimeTagConfig.getCompletedServerTag());
         break;
       }
     }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 1b0ae09..ab79cf7 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -102,21 +102,11 @@ public class RealtimeSegmentRelocatorTest {
     Map<String, Integer> segmentNameToExpectedNumCompletedInstances = new 
HashMap<>(1);
     ZNRecordSerializer znRecordSerializer = new ZNRecordSerializer();
 
-    // no consuming instances found
-    _realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, new 
ArrayList<>());
-    _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, 
completedInstanceList);
     boolean exception = false;
-    try {
-      _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, 
idealState);
-    } catch (Exception e) {
-      exception = true;
-    }
-    Assert.assertTrue(exception);
-    exception = false;
 
     // no completed instances found
     _realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, 
consumingInstanceList);
-    _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new 
ArrayList<String>());
+    _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new 
ArrayList<>());
     try {
       _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, 
idealState);
     } catch (Exception e) {
@@ -164,8 +154,7 @@ public class RealtimeSegmentRelocatorTest {
     _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
     Assert.assertNotSame(idealState, prevIdealState);
     segmentNameToExpectedNumCompletedInstances.put("segment0", 1);
-    verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, 
consumingInstanceList,
-        nReplicas, segmentNameToExpectedNumCompletedInstances);
+    verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, 
nReplicas, segmentNameToExpectedNumCompletedInstances);
 
     // 2 replicas ONLINE on consuming servers, and 1 already relocated. 
relocate 1 replica from consuming to completed
     prevIdealState = new IdealState(
@@ -173,8 +162,7 @@ public class RealtimeSegmentRelocatorTest {
     _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
     Assert.assertNotSame(idealState, prevIdealState);
     segmentNameToExpectedNumCompletedInstances.put("segment0", 2);
-    verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, 
consumingInstanceList,
-        nReplicas, segmentNameToExpectedNumCompletedInstances);
+    verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, 
nReplicas, segmentNameToExpectedNumCompletedInstances);
 
     // 1 replica ONLINE on consuming, 2 already relocated. relocate the 3rd 
replica.
     // However, only 2 completed servers, which is less than num replicas
@@ -198,8 +186,7 @@ public class RealtimeSegmentRelocatorTest {
     _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
     Assert.assertNotSame(idealState, prevIdealState);
     segmentNameToExpectedNumCompletedInstances.put("segment0", 3);
-    verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, 
consumingInstanceList,
-        nReplicas, segmentNameToExpectedNumCompletedInstances);
+    verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, 
nReplicas, segmentNameToExpectedNumCompletedInstances);
 
     // new segment, all CONSUMING, no move necessary
     Map<String, String> instanceStateMap1 = new HashMap<>(3);
@@ -227,29 +214,42 @@ public class RealtimeSegmentRelocatorTest {
     Assert.assertNotSame(idealState, prevIdealState);
     segmentNameToExpectedNumCompletedInstances.put("segment1", 1);
     segmentNameToExpectedNumCompletedInstances.put("segment2", 1);
-    verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, 
consumingInstanceList,
-        nReplicas, segmentNameToExpectedNumCompletedInstances);
+    verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, 
nReplicas, segmentNameToExpectedNumCompletedInstances);
+
+    // a segment with instances that are not consuming tagged instances. 
Relocate them as well
+    Map<String, String> instanceStateMap3 = new HashMap<>(3);
+    instanceStateMap3.put("notAConsumingServer_0", "ONLINE");
+    instanceStateMap3.put("notAConsumingServer_1", "ONLINE");
+    instanceStateMap3.put("notAConsumingServer_2", "ONLINE");
+    idealState.setInstanceStateMap("segment3", instanceStateMap3);
+    prevIdealState = new IdealState(
+        (ZNRecord) 
znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
+    _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
+    Assert.assertNotSame(idealState, prevIdealState);
+    segmentNameToExpectedNumCompletedInstances.put("segment1", 2);
+    segmentNameToExpectedNumCompletedInstances.put("segment2", 2);
+    segmentNameToExpectedNumCompletedInstances.put("segment3", 1);
+    verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, 
nReplicas, segmentNameToExpectedNumCompletedInstances);
   }
 
-  private void verifySegmentAssignment(IdealState updatedIdealState,
-      IdealState prevIdealState, List<String> completedInstanceList, 
List<String> consumingInstanceList, int nReplicas,
-      Map<String, Integer> segmentNameToNumCompletedInstances) {
+  private void verifySegmentAssignment(IdealState updatedIdealState, 
IdealState prevIdealState,
+      List<String> completedInstanceList, int nReplicas, Map<String, Integer> 
segmentNameToNumCompletedInstances) {
     Assert.assertEquals(updatedIdealState.getPartitionSet().size(), 
prevIdealState.getPartitionSet().size());
     
Assert.assertTrue(prevIdealState.getPartitionSet().containsAll(updatedIdealState.getPartitionSet()));
     for (String segmentName : updatedIdealState.getPartitionSet()) {
       Map<String, String> newInstanceStateMap = 
updatedIdealState.getInstanceStateMap(segmentName);
-      int completed = 0;
-      int consuming = 0;
+      int onCompleted = 0;
+      int notOnCompleted = 0;
       for (String instance : newInstanceStateMap.keySet()) {
         if (completedInstanceList.contains(instance)) {
-          completed++;
-        } else if (consumingInstanceList.contains(instance)) {
-          consuming++;
+          onCompleted++;
+        } else {
+          notOnCompleted++;
         }
       }
       int expectedOnCompletedServers = 
segmentNameToNumCompletedInstances.get(segmentName).intValue();
-      Assert.assertEquals(completed, expectedOnCompletedServers);
-      Assert.assertEquals(consuming, nReplicas - expectedOnCompletedServers);
+      Assert.assertEquals(onCompleted, expectedOnCompletedServers);
+      Assert.assertEquals(notOnCompleted, nReplicas - 
expectedOnCompletedServers);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to