lhotari commented on code in PR #24730:
URL: https://github.com/apache/pulsar/pull/24730#discussion_r2343603001
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java:
##########
@@ -143,34 +153,38 @@ private synchronized CompletableFuture<Void>
validateKeySharedMeta(Consumer cons
}
}
- private synchronized Consumer findConflictingConsumer(List<IntRange>
ranges) {
- for (IntRange intRange : ranges) {
- Map.Entry<Integer, Consumer> ceilingEntry =
rangeMap.ceilingEntry(intRange.getStart());
- Map.Entry<Integer, Consumer> floorEntry =
rangeMap.floorEntry(intRange.getEnd());
-
- if (floorEntry != null && floorEntry.getKey() >=
intRange.getStart()) {
- return floorEntry.getValue();
- }
-
- if (ceilingEntry != null && ceilingEntry.getKey() <=
intRange.getEnd()) {
- return ceilingEntry.getValue();
+ private synchronized Consumer findConflictingConsumer(List<IntRange>
newConsumerRanges) {
+ for (IntRange newRange : newConsumerRanges) {
+ // 1. Check for potential conflicts with existing ranges that
start before newRange's start.
+ Map.Entry<Integer, Pair<Range, Consumer>> conflictBeforeStart =
rangeMap.floorEntry(newRange.getStart());
+ if (conflictBeforeStart != null) {
+ Range existingRange = conflictBeforeStart.getValue().getLeft();
+ if (checkRangesOverlap(newRange, existingRange)) {
+ return conflictBeforeStart.getValue().getRight();
+ }
}
-
- if (ceilingEntry != null && floorEntry != null &&
ceilingEntry.getValue().equals(floorEntry.getValue())) {
- KeySharedMeta keySharedMeta =
ceilingEntry.getValue().getKeySharedMeta();
- for (IntRange range : keySharedMeta.getHashRangesList()) {
- int start = Math.max(intRange.getStart(),
range.getStart());
- int end = Math.min(intRange.getEnd(), range.getEnd());
- if (end >= start) {
- return ceilingEntry.getValue();
- }
+ // 2. Check for potential conflicts with existing ranges that
start after newRange's start.
+ Map.Entry<Integer, Pair<Range, Consumer>> conflictAfterStart =
rangeMap.ceilingEntry(newRange.getStart());
+ if (conflictAfterStart != null) {
+ Range existingRange = conflictAfterStart.getValue().getLeft();
+ if (checkRangesOverlap(newRange, existingRange)) {
+ return conflictAfterStart.getValue().getRight();
}
}
}
return null;
}
- Map<Integer, Consumer> getRangeConsumer() {
+
+ private boolean checkRangesOverlap(IntRange range1, Range range2) {
+ return Math.max(range1.getStart(), range2.getStart()) <=
Math.min(range1.getEnd(), range2.getEnd());
Review Comment:
Yes that's clear now.
What I meant is to have a unit test direct for the method. making the method
static and default visibility allows testing in isolation to test all cases.
For example:
```java
@Test
public void testCheckRangesOverlap() {
// Overlap in the middle
Assert.assertTrue(HashRangeExclusiveStickyKeyConsumerSelector.checkRangesOverlap(
new IntRange().setStart(1).setEnd(5),
new IntRange().setStart(3).setEnd(4)));
// One contains the other
Assert.assertTrue(HashRangeExclusiveStickyKeyConsumerSelector.checkRangesOverlap(
new IntRange().setStart(0).setEnd(10),
new IntRange().setStart(2).setEnd(8)));
// Touching at boundary (shared endpoint counts as overlap)
Assert.assertTrue(HashRangeExclusiveStickyKeyConsumerSelector.checkRangesOverlap(
new IntRange().setStart(1).setEnd(2),
new IntRange().setStart(2).setEnd(4)));
// Disjoint - gap between ranges
Assert.assertFalse(HashRangeExclusiveStickyKeyConsumerSelector.checkRangesOverlap(
new IntRange().setStart(1).setEnd(2),
new IntRange().setStart(3).setEnd(4)));
// Disjoint - larger gap between ranges
Assert.assertFalse(HashRangeExclusiveStickyKeyConsumerSelector.checkRangesOverlap(
new IntRange().setStart(1).setEnd(2),
new IntRange().setStart(4).setEnd(5)));
// Disjoint - first after second
Assert.assertFalse(HashRangeExclusiveStickyKeyConsumerSelector.checkRangesOverlap(
new IntRange().setStart(8).setEnd(10),
new IntRange().setStart(1).setEnd(7)));
// Touching at upper/lower boundary in reversed order
Assert.assertTrue(HashRangeExclusiveStickyKeyConsumerSelector.checkRangesOverlap(
new IntRange().setStart(8).setEnd(10),
new IntRange().setStart(10).setEnd(12)));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]