This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 064dd5f8fcd MINOR: Require non-null arguments to
GroupCoordinatorService#onMetadataUpdate (#21052)
064dd5f8fcd is described below
commit 064dd5f8fcdf71681f5e54e5e0e5f7ff1f35b5e6
Author: David Jacot <[email protected]>
AuthorDate: Wed Dec 3 12:05:55 2025 +0100
MINOR: Require non-null arguments to
GroupCoordinatorService#onMetadataUpdate (#21052)
We had some tests passing null values to
`GroupCoordinatorService#onMetadataUpdate` even though it is not an
expected case. This patch fixes those tests and ensure that only
non-null values are accepted.
Reviewers: Lianet Magrans <[email protected]>, Sean Quah
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../runtime/KRaftCoordinatorMetadataDelta.java | 9 +++---
.../runtime/KRaftCoordinatorMetadataImage.java | 3 +-
.../runtime/KRaftCoordinatorMetadataDeltaTest.java | 10 ++----
.../coordinator/group/GroupCoordinatorService.java | 7 +++--
.../group/GroupCoordinatorServiceTest.java | 36 ++++++++++------------
5 files changed, 30 insertions(+), 35 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDelta.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDelta.java
index 8e340d81c88..fe557399645 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDelta.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDelta.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.MetadataDelta;
import java.util.Collection;
+import java.util.Objects;
import java.util.Set;
/**
@@ -30,12 +31,12 @@ public class KRaftCoordinatorMetadataDelta implements
CoordinatorMetadataDelta {
final MetadataDelta metadataDelta;
public KRaftCoordinatorMetadataDelta(MetadataDelta metadataDelta) {
- this.metadataDelta = metadataDelta;
+ this.metadataDelta = Objects.requireNonNull(metadataDelta,
"metadataDelta must be provided");
}
@Override
public Collection<Uuid> createdTopicIds() {
- if (metadataDelta == null || metadataDelta.topicsDelta() == null) {
+ if (metadataDelta.topicsDelta() == null) {
return Set.of();
}
return metadataDelta.topicsDelta().createdTopicIds();
@@ -43,7 +44,7 @@ public class KRaftCoordinatorMetadataDelta implements
CoordinatorMetadataDelta {
@Override
public Collection<Uuid> changedTopicIds() {
- if (metadataDelta == null || metadataDelta.topicsDelta() == null) {
+ if (metadataDelta.topicsDelta() == null) {
return Set.of();
}
return metadataDelta.topicsDelta().changedTopics().keySet();
@@ -51,7 +52,7 @@ public class KRaftCoordinatorMetadataDelta implements
CoordinatorMetadataDelta {
@Override
public Set<Uuid> deletedTopicIds() {
- if (metadataDelta == null || metadataDelta.topicsDelta() == null) {
+ if (metadataDelta.topicsDelta() == null) {
return Set.of();
}
return metadataDelta.topicsDelta().deletedTopicIds();
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
index c0284a4aed6..8df13da1384 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
@@ -27,6 +27,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -38,7 +39,7 @@ public class KRaftCoordinatorMetadataImage implements
CoordinatorMetadataImage {
private final MetadataImage metadataImage;
public KRaftCoordinatorMetadataImage(MetadataImage metadataImage) {
- this.metadataImage = metadataImage;
+ this.metadataImage = Objects.requireNonNull(metadataImage,
"metadataImage must be provided");
}
@Override
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
index f65103e87d8..e72170dc0f8 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
@@ -32,20 +32,14 @@ import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class KRaftCoordinatorMetadataDeltaTest {
@Test
public void testKRaftCoordinatorDeltaWithNulls() {
- assertTrue(new
KRaftCoordinatorMetadataDelta(null).changedTopicIds().isEmpty());
- assertTrue(new KRaftCoordinatorMetadataDelta(new
MetadataDelta(MetadataImage.EMPTY)).changedTopicIds().isEmpty());
-
- assertTrue(new
KRaftCoordinatorMetadataDelta(null).deletedTopicIds().isEmpty());
- assertTrue(new KRaftCoordinatorMetadataDelta(new
MetadataDelta(MetadataImage.EMPTY)).deletedTopicIds().isEmpty());
-
- assertTrue(new
KRaftCoordinatorMetadataDelta(null).createdTopicIds().isEmpty());
- assertTrue(new KRaftCoordinatorMetadataDelta(new
MetadataDelta(MetadataImage.EMPTY)).createdTopicIds().isEmpty());
+ assertThrows(NullPointerException.class, () -> new
KRaftCoordinatorMetadataDelta(null));
}
@Test
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 5738abc5205..9c29183ca38 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -129,6 +129,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
@@ -2311,8 +2312,10 @@ public class GroupCoordinatorService implements
GroupCoordinator {
MetadataImage newImage
) {
throwIfNotActive();
- var wrappedImage = newImage == null ? null : new
KRaftCoordinatorMetadataImage(newImage);
- var wrappedDelta = delta == null ? null : new
KRaftCoordinatorMetadataDelta(delta);
+ Objects.requireNonNull(delta, "delta must be provided");
+ Objects.requireNonNull(newImage, "newImage must be provided");
+ var wrappedImage = new KRaftCoordinatorMetadataImage(newImage);
+ var wrappedDelta = new KRaftCoordinatorMetadataDelta(delta);
metadataImage = wrappedImage;
runtime.onMetadataUpdate(wrappedDelta, wrappedImage);
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 515e55d5eee..df911666295 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -4226,10 +4226,9 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
- .build(true);
+ .build(false);
- // Forcing a null Metadata Image
- service.onMetadataUpdate(null, null);
+ service.startup(() -> 1);
int partition = 1;
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
@@ -4273,7 +4272,7 @@ public class GroupCoordinatorServiceTest {
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();
- service.onMetadataUpdate(null, image);
+ service.onMetadataUpdate(new MetadataDelta(image), image);
int partition = 1;
@@ -4344,7 +4343,7 @@ public class GroupCoordinatorServiceTest {
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();
- service.onMetadataUpdate(null, image);
+ service.onMetadataUpdate(new MetadataDelta(image), image);
int partition = 1;
@@ -4380,7 +4379,7 @@ public class GroupCoordinatorServiceTest {
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();
- service.onMetadataUpdate(null, image);
+ service.onMetadataUpdate(new MetadataDelta(image), image);
int partition = 1;
@@ -4417,7 +4416,7 @@ public class GroupCoordinatorServiceTest {
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();
- service.onMetadataUpdate(null, image);
+ service.onMetadataUpdate(new MetadataDelta(image), image);
int partition = 1;
@@ -4504,10 +4503,9 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
- .build(true);
+ .build(false);
- // Forcing a null Metadata Image
- service.onMetadataUpdate(null, null);
+ service.startup(() -> 1);
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id")
@@ -4701,10 +4699,9 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
- .build(true);
+ .build(false);
- // Forcing a null Metadata Image
- service.onMetadataUpdate(null, null);
+ service.startup(() -> 1);
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
@@ -5581,7 +5578,7 @@ public class GroupCoordinatorServiceTest {
.addTopic(topicId, "topic-name", 3)
.build();
- service.onMetadataUpdate(null, image);
+ service.onMetadataUpdate(new MetadataDelta(image), image);
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder()
@@ -5756,7 +5753,7 @@ public class GroupCoordinatorServiceTest {
.addTopic(topicId, "topic-name", 3)
.build();
- service.onMetadataUpdate(null, image);
+ service.onMetadataUpdate(new MetadataDelta(image), image);
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder()
@@ -5815,10 +5812,9 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
- .build(true);
+ .build(false);
- // Forcing a null Metadata Image
- service.onMetadataUpdate(null, null);
+ service.startup(() -> 1);
String groupId = "share-group";
AlterShareGroupOffsetsRequestData request = new
AlterShareGroupOffsetsRequestData()
@@ -5974,7 +5970,7 @@ public class GroupCoordinatorServiceTest {
.addTopic(topicId, "topic-name", 1)
.build();
- service.onMetadataUpdate(null, image);
+ service.onMetadataUpdate(new MetadataDelta(image), image);
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder()
@@ -6050,7 +6046,7 @@ public class GroupCoordinatorServiceTest {
if (serviceStartup) {
service.startup(() -> 1);
- service.onMetadataUpdate(null, metadataImage);
+ service.onMetadataUpdate(new MetadataDelta(metadataImage),
metadataImage);
}
return service;