jihoonson commented on a change in pull request #8222: Fix bugs in 
overshadowableManager and add unit tests
URL: https://github.com/apache/incubator-druid/pull/8222#discussion_r310785630
 
 

 ##########
 File path: 
core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
 ##########
 @@ -189,89 +247,359 @@ private void 
transitAtomicUpdateGroupState(AtomicUpdateGroup<T> atomicUpdateGrou
   }
 
   /**
-   * Find all atomicUpdateGroups of the given state overshadowed by the given 
rootPartitionRange and minorVersion.
+   * Find all atomicUpdateGroups of the given state overshadowed by the 
minorVersion in the given rootPartitionRange.
    * The atomicUpdateGroup of a higher minorVersion can have a wider 
RootPartitionRange.
    * To find all atomicUpdateGroups overshadowed by the given 
rootPartitionRange and minorVersion,
    * we first need to find the first key contained by the given 
rootPartitionRange.
    * Once we find such key, then we go through the entire map until we see an 
atomicUpdateGroup of which
    * rootRangePartition is not contained by the given rootPartitionRange.
+   *
+   * @param rangeOfAug   the partition range to search for overshadowed groups.
+   * @param minorVersion the minor version to check overshadow relation. The 
found groups will have lower minor versions
+   *                     than this.
+   * @param fromState    the state to search for overshadowed groups.
+   *
+   * @return a list of found atomicUpdateGroups. It could be empty if no 
groups are found.
+   */
+  @VisibleForTesting
+  List<AtomicUpdateGroup<T>> findOvershadowedBy(RootPartitionRange rangeOfAug, 
short minorVersion, State fromState)
+  {
+    final TreeMap<RootPartitionRange, 
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+    Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> 
current = findLowestOverlappingEntry(
+        rangeOfAug,
+        stateMap,
+        true
+    );
+
+    if (current == null) {
+      return Collections.emptyList();
+    }
+
+    // Going through the map to find all entries of the RootPartitionRange 
contained by the given rangeOfAug.
+    // Note that RootPartitionRange of entries are always consecutive.
+    final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
+    while (current != null && rangeOfAug.overlaps(current.getKey())) {
+      if (rangeOfAug.contains(current.getKey())) {
+        // versionToGroup is sorted by minorVersion.
+        // versionToGroup.headMap(minorVersion) below returns a map containing 
all entries of lower minorVersions
+        // than the given minorVersion.
+        final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = 
current.getValue();
+        // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, 
especially size(), is not optimized.
+        // Note that size() is indirectly called in ArrayList.addAll() when 
ObjectSortedSet.toArray() is called.
+        // See AbstractObjectCollection.toArray().
+        // If you see performance degradation here, probably we need to 
improve the below line.
+        if (versionToGroup.firstShortKey() < minorVersion) {
+          found.addAll(versionToGroup.headMap(minorVersion).values());
+        }
+      }
+      current = stateMap.higherEntry(current.getKey());
+    }
+    return found;
+  }
+
+  private List<AtomicUpdateGroup<T>> findOvershadows(AtomicUpdateGroup<T> aug, 
State fromState)
+  {
+    return findOvershadows(RootPartitionRange.of(aug), aug.getMinorVersion(), 
fromState);
+  }
+
+  /**
+   * Find all atomicUpdateGroups which overshadow others of the given 
minorVersion in the given rootPartitionRange.
+   * Similar to {@link #findOvershadowedBy}.
+   *
+   * Note that one atommicUpdateGroup can overshadow multiple other groups. If 
you're finding overshadowing
+   * atomicUpdateGroups by calling this method in a loop, the results of this 
method can contain duplicate groups.
+   *
+   * @param rangeOfAug   the partition range to search for overshadowing 
groups.
+   * @param minorVersion the minor version to check overshadow relation. The 
found groups will have higher minor
+   *                     versions than this.
+   * @param fromState    the state to search for overshadowed groups.
+   *
+   * @return a list of found atomicUpdateGroups. It could be empty if no 
groups are found.
    */
-  private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy(
+  @VisibleForTesting
+  List<AtomicUpdateGroup<T>> findOvershadows(RootPartitionRange rangeOfAug, 
short minorVersion, State fromState)
+  {
+    final TreeMap<RootPartitionRange, 
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+    Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> 
current = findLowestOverlappingEntry(
+        rangeOfAug,
+        stateMap,
+        false
+    );
+
+    if (current == null) {
+      return Collections.emptyList();
+    }
+
+    // Going through the map to find all entries of the RootPartitionRange 
contains the given rangeOfAug.
+    // Note that RootPartitionRange of entries are always consecutive.
+    final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
+    while (current != null && current.getKey().overlaps(rangeOfAug)) {
+      if (current.getKey().contains(rangeOfAug)) {
+        // versionToGroup is sorted by minorVersion.
+        // versionToGroup.tailMap(minorVersion) below returns a map containing 
all entries of equal to or higher
+        // minorVersions than the given minorVersion.
+        final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = 
current.getValue();
+        // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, 
especially size(), is not optimized.
+        // Note that size() is indirectly called in ArrayList.addAll() when 
ObjectSortedSet.toArray() is called.
+        // See AbstractObjectCollection.toArray().
+        // If you see performance degradation here, probably we need to 
improve the below line.
+        if (versionToGroup.lastShortKey() > minorVersion) {
+          found.addAll(versionToGroup.tailMap(minorVersion).values());
+        }
+      }
+      current = stateMap.higherEntry(current.getKey());
+    }
+    return found;
+  }
+
+  private Entry<RootPartitionRange, 
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> findLowestOverlappingEntry(
       RootPartitionRange rangeOfAug,
-      short minorVersion,
-      State fromState
+      TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> 
stateMap,
+      boolean strictSameStartId
   )
   {
-    final TreeMap<RootPartitionRange, 
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
     Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> 
current = stateMap.floorEntry(rangeOfAug);
 
     if (current == null) {
-      return Collections.emptyList();
+      current = stateMap.ceilingEntry(rangeOfAug);
+    }
+
+    if (current == null) {
+      return null;
+    }
+
+    // floorEntry() can return the greatest key less than rangeOfAug. We need 
to skip non-overlapping keys.
+    while (current != null && !current.getKey().overlaps(rangeOfAug)) {
+      current = stateMap.higherEntry(current.getKey());
+    }
+
+    final BiPredicate<RootPartitionRange, RootPartitionRange> predicate;
+    if (strictSameStartId) {
+      predicate = (entryRange, groupRange) -> entryRange.startPartitionId == 
groupRange.startPartitionId;
+    } else {
+      predicate = RootPartitionRange::overlaps;
     }
 
-    // Find the first key for searching for overshadowed atomicUpdateGroup
-    while (true) {
+    // There could be multiple entries of the same startPartitionId but 
different endPartitionId.
+    // Find the first key of the same startPartitionId which has the lowest 
endPartitionId.
+    while (current != null) {
       final Entry<RootPartitionRange, 
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> lowerEntry = stateMap.lowerEntry(
           current.getKey()
       );
-      if (lowerEntry != null && lowerEntry.getKey().startPartitionId == 
rangeOfAug.startPartitionId) {
+      if (lowerEntry != null && predicate.test(lowerEntry.getKey(), 
rangeOfAug)) {
         current = lowerEntry;
       } else {
         break;
       }
     }
 
-    // Going through the map to find all entries of the RootPartitionRange 
contained by the given rangeOfAug.
-    // Note that RootPartitionRange of entries are always consecutive.
-    final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> found = new 
ArrayList<>();
-    while (current != null && rangeOfAug.contains(current.getKey())) {
-      // versionToGroup is sorted by minorVersion.
-      // versionToGroup.subMap(firstKey, minorVersion) below returns a map 
containing all entries of lower minorVersions
-      // than the given minorVersion.
-      final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = 
current.getValue();
-      // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, 
especially size(), is not optimized.
-      // Note that size() is indirectly called in ArrayList.addAll() when 
ObjectSortedSet.toArray() is called.
-      // See AbstractObjectCollection.toArray().
-      // If you see performance degradation here, probably we need to improve 
the below line.
-      found.addAll(versionToGroup.subMap(versionToGroup.firstShortKey(), 
minorVersion).short2ObjectEntrySet());
-      current = stateMap.higherEntry(current.getKey());
-    }
-    return found;
+    return current;
   }
 
   /**
-   * Handles addition of the atomicUpdateGroup to the given state
+   * Determine the visible group after a new chunk is added.
    */
-  private void transitionStandbyGroupIfFull(AtomicUpdateGroup<T> aug, State 
stateOfAug)
+  private void determineVisibleGroupAfterAdd(AtomicUpdateGroup<T> aug, State 
stateOfAug)
   {
     if (stateOfAug == State.STANDBY) {
       // A standby atomicUpdateGroup becomes visible when its all segments are 
available.
       if (aug.isFull()) {
         // A visible atomicUpdateGroup becomes overshadowed when a fully 
available standby atomicUpdateGroup becomes
         // visible which overshadows the current visible one.
-        findOvershadowedBy(aug, State.VISIBLE)
-            .forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(), 
State.VISIBLE, State.OVERSHADOWED));
+        replaceVisibleWith(
+            findOvershadowedBy(aug, State.VISIBLE),
+            State.OVERSHADOWED,
+            Collections.singletonList(aug),
+            State.STANDBY
+        );
         findOvershadowedBy(aug, State.STANDBY)
-            .forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(), 
State.STANDBY, State.OVERSHADOWED));
-        transitAtomicUpdateGroupState(aug, State.STANDBY, State.VISIBLE);
+            .forEach(entry -> transitAtomicUpdateGroupState(entry, 
State.STANDBY, State.OVERSHADOWED));
+      } else {
+        // The given atomicUpdateGroup is in the standby state which means 
it's not overshadowed by the visible group.
+        // If the visible group is not fully available, then the new standby 
group should be visible since it has a
+        // higher minor version.
+        checkVisibleIsFullyAvailableAndMoveNewStandbyToVisible(aug, 
stateOfAug);
       }
+    } else if (stateOfAug == State.OVERSHADOWED) {
+      checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible(aug, 
stateOfAug);
     }
   }
 
-  private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State 
state)
+  /**
+   * This method is called in {@link #determineVisibleGroupAfterAdd}. It 
checks the current visible group is
+   * fully available, and if not, moves the given group to the visible state.
+   */
+  private void checkVisibleIsFullyAvailableAndMoveNewStandbyToVisible(
 
 Review comment:
   Good point. Moved the full standby handling into this method and updated its 
javadoc.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to