This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff5025a21c4 KAFKA-19695: Fix bug in redundant offset calculation.
(#20516)
ff5025a21c4 is described below
commit ff5025a21c4e2b4d437f85aa75e1c3f7d4f09ef9
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Sep 10 22:08:34 2025 +0530
KAFKA-19695: Fix bug in redundant offset calculation. (#20516)
* The `ShareCoordinatorShard` maintains the the record offset
information for `SharePartitionKey`s in the
`ShareCoordinatorOffsetsManager` class.
* Replay of `ShareSnapshot`s in the shards are reflected in the offsets
manager including records created due to delete state.
* However, if the share partition delete is due to topic delete, no
record will ever be written for the same `SharePartitionKey` post the
delete tombstone (as topic id will not repeat).
As a result the offset manager will always consider the deleted share
partition's offset as the last redundant one.
* The fix is to make the offset manager aware of the tombstone records
and remove them from the redundant offset calculation.
* Unit tests have been updated for the same.
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>
---
.../share/ShareCoordinatorOffsetsManager.java | 9 +-
.../coordinator/share/ShareCoordinatorShard.java | 2 +-
.../share/ShareCoordinatorOffsetsManagerTest.java | 96 ++++++++++++++++++----
3 files changed, 90 insertions(+), 17 deletions(-)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
index 69070f65e93..0b3e5a5ff08 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
@@ -60,13 +60,20 @@ public class ShareCoordinatorOffsetsManager {
*
* @param key - represents {@link SharePartitionKey} whose offset needs
updating
* @param offset - represents the latest partition offset for provided key
+ * @param isDelete - true if the offset is for a tombstone record
*/
- public void updateState(SharePartitionKey key, long offset) {
+ public void updateState(SharePartitionKey key, long offset, boolean
isDelete) {
lastRedundantOffset.set(Math.min(lastRedundantOffset.get(), offset));
offsets.put(key, offset);
Optional<Long> redundantOffset = findRedundantOffset();
redundantOffset.ifPresent(lastRedundantOffset::set);
+
+ // If the share partition is deleted, we should not hold onto its
offset in our calculations
+ // as there is nothing beyond deletion which is going to update its
state.
+ if (isDelete) {
+ offsets.remove(key);
+ }
}
private Optional<Long> findRedundantOffset() {
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 76a654de4c1..9d52780faa5 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -266,7 +266,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
}
}
- offsetsManager.updateState(mapKey, offset);
+ offsetsManager.updateState(mapKey, offset, value == null);
}
private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value)
{
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
index 262f166be19..73d72bde9f0 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
@@ -32,6 +32,7 @@ import java.util.Optional;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
public class ShareCoordinatorOffsetsManagerTest {
@@ -48,16 +49,19 @@ public class ShareCoordinatorOffsetsManagerTest {
@Test
public void testUpdateStateAddsToInternalState() {
- manager.updateState(KEY1, 0L);
+ manager.updateState(KEY1, 0L, false);
assertEquals(Optional.empty(), manager.lastRedundantOffset());
- manager.updateState(KEY1, 10L);
+ manager.updateState(KEY1, 10L, false);
assertEquals(Optional.of(10L), manager.lastRedundantOffset()); //
[0-9] offsets are redundant.
- manager.updateState(KEY2, 15L);
+ manager.updateState(KEY2, 15L, false);
assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // No
update to last redundant after adding 15L so, still 10L.
- assertEquals(10L, manager.curState().get(KEY1));
+ manager.updateState(KEY1, 25L, true);
+ assertEquals(Optional.of(15L), manager.lastRedundantOffset()); //
KEY1 deleted, no longer part of calculation.
+
+ assertNull(manager.curState().get(KEY1));
assertEquals(15L, manager.curState().get(KEY2));
}
@@ -66,15 +70,21 @@ public class ShareCoordinatorOffsetsManagerTest {
final SharePartitionKey key;
final long offset;
final Optional<Long> expectedOffset;
+ final boolean isDelete;
- private TestTuple(SharePartitionKey key, long offset,
Optional<Long> expectedOffset) {
+ private TestTuple(SharePartitionKey key, long offset,
Optional<Long> expectedOffset, boolean isDelete) {
this.key = key;
this.offset = offset;
this.expectedOffset = expectedOffset;
+ this.isDelete = isDelete;
}
static TestTuple instance(SharePartitionKey key, long offset,
Optional<Long> expectedOffset) {
- return new TestTuple(key, offset, expectedOffset);
+ return new TestTuple(key, offset, expectedOffset, false);
+ }
+
+ static TestTuple instance(SharePartitionKey key, long offset,
Optional<Long> expectedOffset, boolean isDelete) {
+ return new TestTuple(key, offset, expectedOffset, isDelete);
}
}
@@ -96,19 +106,35 @@ public class ShareCoordinatorOffsetsManagerTest {
static Stream<ShareOffsetTestHolder> generateNoRedundantStateCases() {
return Stream.of(
new ShareOffsetTestHolder(
- "no redundant state single key",
+ "no redundant state single key.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L))
)
),
new ShareOffsetTestHolder(
- "no redundant state multiple keys",
+ "no redundant state single key with delete.",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L), true)
+ )
+ ),
+
+ new ShareOffsetTestHolder(
+ "no redundant state multiple keys.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L,
Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L,
Optional.of(10L))
)
+ ),
+
+ new ShareOffsetTestHolder(
+ "no redundant state multiple keys with delete.",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L), true),
+ ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L,
Optional.of(11L), true),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L,
Optional.of(13L), true)
+ )
)
);
}
@@ -116,7 +142,7 @@ public class ShareCoordinatorOffsetsManagerTest {
static Stream<ShareOffsetTestHolder> generateRedundantStateCases() {
return Stream.of(
new ShareOffsetTestHolder(
- "redundant state single key",
+ "redundant state single key.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L,
Optional.of(11L)),
@@ -125,7 +151,7 @@ public class ShareCoordinatorOffsetsManagerTest {
),
new ShareOffsetTestHolder(
- "redundant state multiple keys",
+ "redundant state multiple keys.",
// KEY1: 10 17
// KEY2: 11 16
// KEY3: 15
@@ -136,6 +162,20 @@ public class ShareCoordinatorOffsetsManagerTest {
ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L,
Optional.of(10L)), // KEY2 11 redundant but should not be returned
ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L,
Optional.of(15L))
)
+ ),
+
+ new ShareOffsetTestHolder(
+ "redundant state multiple keys with delete.",
+ // KEY1: 10 17
+ // KEY2: 11 16
+ // KEY3: 15
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L,
Optional.of(10L), true),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L,
Optional.of(10L)), // KEY2 11 redundant but should not be returned
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L,
Optional.of(16L)) // Because we have removed KEY3 from calculation
+ )
)
);
@@ -144,7 +184,7 @@ public class ShareCoordinatorOffsetsManagerTest {
static Stream<ShareOffsetTestHolder> generateComplexCases() {
return Stream.of(
new ShareOffsetTestHolder(
- "redundant state reverse key order",
+ "redundant state reverse key order.",
// Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1.
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
@@ -156,6 +196,18 @@ public class ShareCoordinatorOffsetsManagerTest {
)
),
+ new ShareOffsetTestHolder(
+ "redundant state reverse key order with delete.",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L,
Optional.of(10L), true),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L,
Optional.of(10L), true),
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L,
Optional.of(25L)) // Because KEY2 and KEY3 are gone.
+ )
+ ),
+
new ShareOffsetTestHolder(
"redundant state infrequently written partition.",
List.of(
@@ -170,6 +222,20 @@ public class ShareCoordinatorOffsetsManagerTest {
ShareOffsetTestHolder.TestTuple.instance(KEY3, 28L,
Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L,
Optional.of(27L))
)
+ ),
+
+ new ShareOffsetTestHolder(
+ "redundant state infrequently written partition with delete.",
+ List.of(
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 18L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY3, 20L,
Optional.of(10L), true), //KEY3 no longer party to calculation
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 22L,
Optional.of(10L)),
+ ShareOffsetTestHolder.TestTuple.instance(KEY2, 27L,
Optional.of(10L), true), //KEY2 no longer party to calculation
+ ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L,
Optional.of(30L))
+ )
)
);
}
@@ -179,7 +245,7 @@ public class ShareCoordinatorOffsetsManagerTest {
public void testUpdateStateNoRedundantState(ShareOffsetTestHolder holder) {
if (holder.shouldRun) {
holder.tuples.forEach(tuple -> {
- manager.updateState(tuple.key, tuple.offset);
+ manager.updateState(tuple.key, tuple.offset, tuple.isDelete);
assertEquals(tuple.expectedOffset,
manager.lastRedundantOffset(), holder.testName);
});
}
@@ -190,7 +256,7 @@ public class ShareCoordinatorOffsetsManagerTest {
public void testUpdateStateRedundantState(ShareOffsetTestHolder holder) {
if (holder.shouldRun) {
holder.tuples.forEach(tuple -> {
- manager.updateState(tuple.key, tuple.offset);
+ manager.updateState(tuple.key, tuple.offset, tuple.isDelete);
assertEquals(tuple.expectedOffset,
manager.lastRedundantOffset(), holder.testName);
});
}
@@ -201,9 +267,9 @@ public class ShareCoordinatorOffsetsManagerTest {
public void testUpdateStateComplexCases(ShareOffsetTestHolder holder) {
if (holder.shouldRun) {
holder.tuples.forEach(tuple -> {
- manager.updateState(tuple.key, tuple.offset);
+ manager.updateState(tuple.key, tuple.offset, tuple.isDelete);
assertEquals(tuple.expectedOffset,
manager.lastRedundantOffset(), holder.testName);
});
}
}
-}
+}
\ No newline at end of file