This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 97c8c6b595b KAFKA-19733 Fix arguments to assertEquals() in clients
module (#20586)
97c8c6b595b is described below
commit 97c8c6b595bbff31b06e457aa08131965c845040
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Thu Sep 25 21:07:02 2025 +0530
KAFKA-19733 Fix arguments to assertEquals() in clients module (#20586)
The given PR mostly fixes the order of arguments in `assertEquals()` for
the Clients module. Some minor cleanups were included with the same too.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../clients/producer/ProducerIdExpirationTest.java | 12 +--
.../producer/ProducerSendWhileDeletionTest.java | 5 +-
.../kafka/clients/ClusterConnectionStatesTest.java | 4 +-
.../org/apache/kafka/clients/MetadataTest.java | 74 ++++++++---------
.../org/apache/kafka/clients/admin/ConfigTest.java | 2 +-
.../consumer/CooperativeStickyAssignorTest.java | 4 +-
.../kafka/clients/consumer/MockConsumerTest.java | 4 +-
.../internals/AbstractStickyAssignorTest.java | 4 +-
.../internals/ConsumerCoordinatorTest.java | 46 +++++------
.../internals/FetchRequestManagerTest.java | 16 ++--
.../clients/consumer/internals/FetcherTest.java | 20 ++---
.../internals/KafkaConsumerMetricsTest.java | 4 +-
.../consumer/internals/OffsetFetcherTest.java | 6 +-
.../internals/OffsetForLeaderEpochClientTest.java | 6 +-
.../StreamsGroupHeartbeatRequestManagerTest.java | 95 ++++++----------------
.../internals/StreamsRebalanceDataTest.java | 2 +-
.../metrics/AsyncConsumerMetricsTest.java | 16 ++--
.../kafka/clients/producer/MockProducerTest.java | 8 +-
.../clients/producer/internals/BufferPoolTest.java | 4 +-
.../internals/KafkaProducerMetricsTest.java | 4 +-
.../producer/internals/RecordAccumulatorTest.java | 4 +-
.../clients/producer/internals/SenderTest.java | 6 +-
.../producer/internals/TransactionManagerTest.java | 26 +++---
.../common/feature/SupportedVersionRangeTest.java | 2 +-
.../apache/kafka/common/metrics/MetricsTest.java | 4 +-
.../apache/kafka/common/metrics/SensorTest.java | 12 +--
.../common/requests/UpdateFeaturesRequestTest.java | 12 +--
.../authenticator/SaslServerAuthenticatorTest.java | 2 +-
.../common/security/kerberos/KerberosRuleTest.java | 12 +--
...uthBearerUnsecuredLoginCallbackHandlerTest.java | 2 +-
.../internals/ClientTelemetryReporterTest.java | 2 +-
.../kafka/server/policy/AlterConfigPolicyTest.java | 4 +-
.../java/org/apache/kafka/test/TestSslUtils.java | 42 ++++++----
33 files changed, 215 insertions(+), 251 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
index f79b3786253..a9489c88327 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
@@ -204,10 +204,10 @@ public class ProducerIdExpirationTest {
// Update the producer ID expiration ms to a very high value.
admin.incrementalAlterConfigs(producerIdExpirationConfig("100000"));
- cluster.brokers().values().forEach(broker -> {
+ cluster.brokers().values().forEach(broker ->
TestUtils.waitUntilTrue(() ->
broker.logManager().producerStateManagerConfig().producerIdExpirationMs() ==
100000,
- () -> "Configuration was not updated.",
DEFAULT_MAX_WAIT_MS, 100);
- });
+ () -> "Configuration was not updated.",
DEFAULT_MAX_WAIT_MS, 100)
+ );
// Send more records to send producer ID back to brokers.
producer.send(new ProducerRecord<>(topic1, 0, null,
"key".getBytes(), "value".getBytes()));
producer.flush();
@@ -226,10 +226,10 @@ public class ProducerIdExpirationTest {
kafkaBroker.awaitShutdown();
kafkaBroker.startup();
cluster.waitForReadyBrokers();
- cluster.brokers().values().forEach(broker -> {
+ cluster.brokers().values().forEach(broker ->
TestUtils.waitUntilTrue(() ->
broker.logManager().producerStateManagerConfig().producerIdExpirationMs() ==
100,
- () -> "Configuration was not updated.",
DEFAULT_MAX_WAIT_MS, 100);
- });
+ () -> "Configuration was not updated.",
DEFAULT_MAX_WAIT_MS, 100)
+ );
// Ensure producer ID expires quickly again.
waitProducerIdExpire(admin);
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
index 4301eecbb9c..aa93431cf63 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
@@ -184,9 +184,8 @@ public class ProducerSendWhileDeletionTest {
try (var producer = createProducer()) {
for (int i = 1; i <= numRecords; i++) {
producer.send(new ProducerRecord<>(topic, null, ("value" +
i).getBytes()),
- (metadata, exception) -> {
- numAcks.incrementAndGet();
- });
+ (metadata, exception) -> numAcks.incrementAndGet()
+ );
}
producer.flush();
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index f647d95445f..9812f490ddd 100644
---
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -186,7 +186,7 @@ public class ClusterConnectionStatesTest {
connectionStates.authenticationFailed(nodeId1, time.milliseconds(),
new AuthenticationException("No path to CA for certificate!"));
time.sleep(1000);
- assertEquals(connectionStates.connectionState(nodeId1),
ConnectionState.AUTHENTICATION_FAILED);
+ assertEquals(ConnectionState.AUTHENTICATION_FAILED,
connectionStates.connectionState(nodeId1));
assertNotNull(connectionStates.authenticationException(nodeId1));
assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds()));
@@ -210,7 +210,7 @@ public class ClusterConnectionStatesTest {
connectionStates.remove(nodeId1);
assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
assertFalse(connectionStates.isBlackedOut(nodeId1,
time.milliseconds()));
- assertEquals(connectionStates.connectionDelay(nodeId1,
time.milliseconds()), 0L);
+ assertEquals(0L, connectionStates.connectionDelay(nodeId1,
time.milliseconds()));
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 13c378d3983..9ac75191004 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -361,28 +361,28 @@ public class MetadataTest {
// Metadata with newer epoch is handled
metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1,
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
- assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(leaderAndEpoch.intValue(), 10));
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(10, leaderAndEpoch.intValue()));
// Don't update to an older one
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
- assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(leaderAndEpoch.intValue(), 10));
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(10, leaderAndEpoch.intValue()));
// Don't cause update if it's the same one
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 10));
- assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(leaderAndEpoch.intValue(), 10));
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(10, leaderAndEpoch.intValue()));
// Update if we see newer epoch
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 12));
- assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(leaderAndEpoch.intValue(), 12));
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(12, leaderAndEpoch.intValue()));
metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1,
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
- assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(leaderAndEpoch.intValue(), 12));
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(12, leaderAndEpoch.intValue()));
// Don't overwrite metadata with older epoch
metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1,
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 11);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
- assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(leaderAndEpoch.intValue(), 12));
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch ->
assertEquals(12, leaderAndEpoch.intValue()));
}
@Test
@@ -465,7 +465,7 @@ public class MetadataTest {
metadata.updateWithCurrentRequestVersion(metadataResponse, false,
10L);
assertNotNull(metadata.fetch().partition(tp));
assertTrue(metadata.lastSeenLeaderEpoch(tp).isPresent());
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(),
100);
+ assertEquals(100,
metadata.lastSeenLeaderEpoch(tp).get().longValue());
}
// Fake an empty ISR, but with an older epoch, should reject it
@@ -475,8 +475,8 @@ public class MetadataTest {
new MetadataResponse.PartitionMetadata(error,
partition, leader,
leaderEpoch, replicas, Collections.emptyList(),
offlineReplicas), ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
metadata.updateWithCurrentRequestVersion(metadataResponse, false,
20L);
-
assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 1);
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(),
100);
+ assertEquals(1,
metadata.fetch().partition(tp).inSyncReplicas().length);
+ assertEquals(100,
metadata.lastSeenLeaderEpoch(tp).get().longValue());
}
// Fake an empty ISR, with same epoch, accept it
@@ -486,8 +486,8 @@ public class MetadataTest {
new MetadataResponse.PartitionMetadata(error,
partition, leader,
leaderEpoch, replicas, Collections.emptyList(),
offlineReplicas), ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
metadata.updateWithCurrentRequestVersion(metadataResponse, false,
20L);
-
assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 0);
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(),
100);
+ assertEquals(0,
metadata.fetch().partition(tp).inSyncReplicas().length);
+ assertEquals(100,
metadata.lastSeenLeaderEpoch(tp).get().longValue());
}
// Empty metadata response, should not keep old partition but should
keep the last-seen epoch
@@ -495,7 +495,7 @@ public class MetadataTest {
MetadataResponse metadataResponse =
RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(),
Collections.emptyMap());
metadata.updateWithCurrentRequestVersion(metadataResponse, false,
20L);
assertNull(metadata.fetch().partition(tp));
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(),
100);
+ assertEquals(100,
metadata.lastSeenLeaderEpoch(tp).get().longValue());
}
// Back in the metadata, with old epoch, should not get added
@@ -503,7 +503,7 @@ public class MetadataTest {
MetadataResponse metadataResponse =
RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(),
partitionCounts, _tp -> 99);
metadata.updateWithCurrentRequestVersion(metadataResponse, false,
10L);
assertNull(metadata.fetch().partition(tp));
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(),
100);
+ assertEquals(100,
metadata.lastSeenLeaderEpoch(tp).get().longValue());
}
}
@@ -522,31 +522,31 @@ public class MetadataTest {
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
assertNotNull(metadata.fetch().partition(tp));
assertTrue(metadata.lastSeenLeaderEpoch(tp).isPresent());
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
+ assertEquals(100, metadata.lastSeenLeaderEpoch(tp).get().longValue());
// Simulate a leader epoch from another response, like a fetch
response or list offsets
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 101));
// Cache of partition stays, but current partition info is not
available since it's stale
assertNotNull(metadata.fetch().partition(tp));
-
assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(),
5);
+ assertEquals(5,
Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue());
assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
+ assertEquals(101, metadata.lastSeenLeaderEpoch(tp).get().longValue());
// Metadata with older epoch is rejected, metadata state is unchanged
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L);
assertNotNull(metadata.fetch().partition(tp));
-
assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(),
5);
+ assertEquals(5,
Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue());
assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
+ assertEquals(101, metadata.lastSeenLeaderEpoch(tp).get().longValue());
// Metadata with equal or newer epoch is accepted
metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1,
Collections.emptyMap(), partitionCounts, _tp -> 101);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 30L);
assertNotNull(metadata.fetch().partition(tp));
-
assertEquals(Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue(),
5);
+ assertEquals(5,
Objects.requireNonNull(metadata.fetch().partitionCountForTopic("topic-1")).longValue());
assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
+ assertEquals(101, metadata.lastSeenLeaderEpoch(tp).get().longValue());
}
@Test
@@ -585,18 +585,18 @@ public class MetadataTest {
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
Cluster cluster = metadata.fetch();
- assertEquals(cluster.clusterResource().clusterId(), "dummy");
- assertEquals(cluster.nodes().size(), 4);
+ assertEquals("dummy", cluster.clusterResource().clusterId());
+ assertEquals(4, cluster.nodes().size());
// topic counts
assertEquals(cluster.invalidTopics(), Collections.singleton("topic3"));
assertEquals(cluster.unauthorizedTopics(),
Collections.singleton("topic4"));
- assertEquals(cluster.topics().size(), 3);
+ assertEquals(3, cluster.topics().size());
assertEquals(cluster.internalTopics(),
Collections.singleton(Topic.GROUP_METADATA_TOPIC_NAME));
// partition counts
- assertEquals(cluster.partitionsForTopic("topic1").size(), 2);
- assertEquals(cluster.partitionsForTopic("topic2").size(), 3);
+ assertEquals(2, cluster.partitionsForTopic("topic1").size());
+ assertEquals(3, cluster.partitionsForTopic("topic2").size());
// Sentinel instances
InetSocketAddress address =
InetSocketAddress.createUnresolved("localhost", 0);
@@ -798,10 +798,10 @@ public class MetadataTest {
TopicPartition tp = new TopicPartition("topic-1", 0);
- assertOptional(metadata.fetch().nodeIfOnline(tp, 0), node ->
assertEquals(node.id(), 0));
+ assertOptional(metadata.fetch().nodeIfOnline(tp, 0), node ->
assertEquals(0, node.id()));
assertFalse(metadata.fetch().nodeIfOnline(tp, 1).isPresent());
- assertEquals(metadata.fetch().nodeById(0).id(), 0);
- assertEquals(metadata.fetch().nodeById(1).id(), 1);
+ assertEquals(0, metadata.fetch().nodeById(0).id());
+ assertEquals(1, metadata.fetch().nodeById(1).id());
}
@Test
@@ -831,7 +831,7 @@ public class MetadataTest {
TopicPartition tp = new TopicPartition("topic-1", 0);
- assertEquals(metadata.fetch().nodeById(0).id(), 0);
+ assertEquals(0, metadata.fetch().nodeById(0).id());
assertNull(metadata.fetch().partition(tp));
assertEquals(metadata.fetch().nodeIfOnline(tp, 0), Optional.empty());
}
@@ -955,13 +955,13 @@ public class MetadataTest {
// Update the metadata to add a new topic variant, "new", which will
be retained with "keep". Note this
// means that all of the "old" topics should be dropped.
Cluster cluster = metadata.fetch();
- assertEquals(cluster.clusterResource().clusterId(), oldClusterId);
- assertEquals(cluster.nodes().size(), oldNodes);
+ assertEquals(oldClusterId, cluster.clusterResource().clusterId());
+ assertEquals(oldNodes, cluster.nodes().size());
assertEquals(cluster.invalidTopics(), Set.of("oldInvalidTopic",
"keepInvalidTopic"));
assertEquals(cluster.unauthorizedTopics(),
Set.of("oldUnauthorizedTopic", "keepUnauthorizedTopic"));
assertEquals(cluster.topics(), Set.of("oldValidTopic",
"keepValidTopic"));
- assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2);
- assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3);
+ assertEquals(2, cluster.partitionsForTopic("oldValidTopic").size());
+ assertEquals(3, cluster.partitionsForTopic("keepValidTopic").size());
assertEquals(new HashSet<>(cluster.topicIds()), new
HashSet<>(topicIds.values()));
String newClusterId = "newClusterId";
@@ -990,13 +990,13 @@ public class MetadataTest {
assertNull(metadataTopicIds2.get("oldValidTopic"));
cluster = metadata.fetch();
- assertEquals(cluster.clusterResource().clusterId(), newClusterId);
+ assertEquals(newClusterId, cluster.clusterResource().clusterId());
assertEquals(cluster.nodes().size(), newNodes);
assertEquals(cluster.invalidTopics(), Set.of("keepInvalidTopic",
"newInvalidTopic"));
assertEquals(cluster.unauthorizedTopics(),
Set.of("keepUnauthorizedTopic", "newUnauthorizedTopic"));
assertEquals(cluster.topics(), Set.of("keepValidTopic",
"newValidTopic"));
- assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
- assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+ assertEquals(2, cluster.partitionsForTopic("keepValidTopic").size());
+ assertEquals(4, cluster.partitionsForTopic("newValidTopic").size());
assertEquals(new HashSet<>(cluster.topicIds()), new
HashSet<>(topicIds.values()));
// Perform another metadata update, but this time all topic metadata
should be cleared.
@@ -1008,7 +1008,7 @@ public class MetadataTest {
topicIds.forEach((topicName, topicId) ->
assertNull(metadataTopicIds3.get(topicName)));
cluster = metadata.fetch();
- assertEquals(cluster.clusterResource().clusterId(), newClusterId);
+ assertEquals(newClusterId, cluster.clusterResource().clusterId());
assertEquals(cluster.nodes().size(), newNodes);
assertEquals(cluster.invalidTopics(), Collections.emptySet());
assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
index d09cca7ad66..f3b1e73d72e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
@@ -64,7 +64,7 @@ public class ConfigTest {
assertEquals(config, config);
assertEquals(config, new Config(config.entries()));
assertNotEquals(new Config(Collections.singletonList(E1)), config);
- assertNotEquals(config, "this");
+ assertNotEquals("this", config);
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
index b85d000e167..6a6aa919be1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
@@ -81,7 +81,7 @@ public class CooperativeStickyAssignorTest extends
AbstractStickyAssignorTest {
Optional<Integer> encodedGeneration = ((CooperativeStickyAssignor)
assignor).memberData(subscription).generation;
assertTrue(encodedGeneration.isPresent());
- assertEquals(encodedGeneration.get(), DEFAULT_GENERATION);
+ assertEquals(DEFAULT_GENERATION, encodedGeneration.get());
int generation = 10;
assignor.onAssignment(null, new
ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id",
Optional.empty()));
@@ -90,7 +90,7 @@ public class CooperativeStickyAssignorTest extends
AbstractStickyAssignorTest {
encodedGeneration = ((CooperativeStickyAssignor)
assignor).memberData(subscription).generation;
assertTrue(encodedGeneration.isPresent());
- assertEquals(encodedGeneration.get(), generation);
+ assertEquals(generation, encodedGeneration.get());
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 647976b1d1d..6968b45a57b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -209,9 +209,7 @@ public class MockConsumerTest {
consumer.assign(Collections.singleton(partition));
consumer.updateBeginningOffsets(Collections.singletonMap(partition,
0L));
- IntStream.range(0, 10).forEach(offset -> {
- consumer.addRecord(new ConsumerRecord<>("test", 0, offset, null,
null));
- });
+ IntStream.range(0, 10).forEach(offset -> consumer.addRecord(new
ConsumerRecord<>("test", 0, offset, null, null)));
consumer.setMaxPollRecords(2L);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index fe6b4d100ff..4e9525264a0 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -1025,7 +1025,7 @@ public abstract class AbstractStickyAssignorTest {
Map<String, List<TopicPartition>> assignment =
assignor.assignPartitions(partitionsPerTopic, subscriptions);
assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
- assertEquals(assignment.values().stream().mapToInt(List::size).sum(),
1 + 100);
+ assertEquals(1 + 100,
assignment.values().stream().mapToInt(List::size).sum());
assertEquals(Collections.singleton(consumerId), assignment.keySet());
assertTrue(isFullyBalanced(assignment));
}
@@ -1043,7 +1043,7 @@ public abstract class AbstractStickyAssignorTest {
assignment = assignor.assign(Collections.emptyMap(), subscriptions);
assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
- assertEquals(assignment.size(), 1);
+ assertEquals(1, assignment.size());
assertTrue(assignment.get(consumerId).isEmpty());
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index fcbbfeb8eeb..623fd765f39 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -568,13 +568,13 @@ public abstract class ConsumerCoordinatorTest {
assertFalse(client.hasInFlightRequests());
// should try to find coordinator since we are commit async
- coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), (offsets, exception) -> {
- fail("Commit should not get responses, but got offsets:" + offsets
+ ", and exception:" + exception);
- });
+ coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), (offsets, exception) ->
+ fail("Commit should not get responses, but got offsets:" + offsets
+ ", and exception:" + exception)
+ );
coordinator.poll(time.timer(0));
assertTrue(coordinator.coordinatorUnknown());
assertTrue(client.hasInFlightRequests());
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
client.respond(groupCoordinatorResponse(node, Errors.NONE));
coordinator.poll(time.timer(0));
@@ -582,7 +582,7 @@ public abstract class ConsumerCoordinatorTest {
// after we've discovered the coordinator we should send
// out the commit request immediately
assertTrue(client.hasInFlightRequests());
- assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
+ assertEquals(1, coordinator.inFlightAsyncCommits.get());
}
@Test
@@ -619,13 +619,13 @@ public abstract class ConsumerCoordinatorTest {
assertFalse(coordinator.commitOffsetsSync(Collections.emptyMap(),
time.timer(100L)), "expected sync commit to fail");
assertFalse(committed.get());
- assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
+ assertEquals(1, coordinator.inFlightAsyncCommits.get());
prepareOffsetCommitRequest(singletonMap(tp, 123L), Errors.NONE);
assertTrue(coordinator.commitOffsetsSync(Collections.emptyMap(),
time.timer(Long.MAX_VALUE)), "expected sync commit to succeed");
assertTrue(committed.get(), "expected commit callback to be invoked");
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
}
@Test
@@ -646,13 +646,13 @@ public abstract class ConsumerCoordinatorTest {
"Unexpected exception cause type: " + (cause == null ?
null : cause.getClass()));
});
}
- assertEquals(coordinator.inFlightAsyncCommits.get(), numRequests);
+ assertEquals(numRequests, coordinator.inFlightAsyncCommits.get());
coordinator.markCoordinatorUnknown("test cause");
consumerClient.pollNoWakeup();
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(numRequests, responses.get());
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
}
@Test
@@ -697,7 +697,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.markCoordinatorUnknown("test cause");
consumerClient.pollNoWakeup();
assertTrue(asyncCallbackInvoked.get());
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
}
@Test
@@ -2350,7 +2350,7 @@ public abstract class ConsumerCoordinatorTest {
MockCommitCallback secondCommitCallback = new MockCommitCallback();
coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), firstCommitCallback);
coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), secondCommitCallback);
- assertEquals(coordinator.inFlightAsyncCommits.get(), 2);
+ assertEquals(2, coordinator.inFlightAsyncCommits.get());
respondToOffsetCommitRequest(singletonMap(t1p, 100L), error);
consumerClient.pollNoWakeup();
@@ -2360,7 +2360,7 @@ public abstract class ConsumerCoordinatorTest {
assertTrue(coordinator.coordinatorUnknown());
assertInstanceOf(RetriableCommitFailedException.class,
firstCommitCallback.exception);
assertInstanceOf(RetriableCommitFailedException.class,
secondCommitCallback.exception);
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
}
@Test
@@ -2549,7 +2549,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), mockOffsetCommitCallback);
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
assertNull(mockOffsetCommitCallback.exception);
@@ -2580,7 +2580,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), callback(success));
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(success.get());
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
}
@Test
@@ -2590,7 +2590,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
prepareOffsetCommitRequest(singletonMap(t1p, 100L),
Errors.COORDINATOR_NOT_AVAILABLE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), mockOffsetCommitCallback);
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
assertInstanceOf(RetriableCommitFailedException.class,
mockOffsetCommitCallback.exception);
@@ -2605,7 +2605,7 @@ public abstract class ConsumerCoordinatorTest {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequest(singletonMap(t1p, 100L),
Errors.COORDINATOR_NOT_AVAILABLE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), cb);
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(coordinator.coordinatorUnknown());
@@ -2622,7 +2622,7 @@ public abstract class ConsumerCoordinatorTest {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequest(singletonMap(t1p, 100L),
Errors.NOT_COORDINATOR);
coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), cb);
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(coordinator.coordinatorUnknown());
@@ -2639,7 +2639,7 @@ public abstract class ConsumerCoordinatorTest {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L));
coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), cb);
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(coordinator.coordinatorUnknown());
@@ -2703,7 +2703,7 @@ public abstract class ConsumerCoordinatorTest {
}
};
- assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
+ assertEquals(1, coordinator.inFlightAsyncCommits.get());
thread.start();
client.waitForRequests(2, 5000);
@@ -2711,7 +2711,7 @@ public abstract class ConsumerCoordinatorTest {
respondToOffsetCommitRequest(singletonMap(t1p, secondOffset.offset()),
Errors.NONE);
thread.join();
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
assertEquals(Arrays.asList(firstOffset, secondOffset),
committedOffsets);
}
@@ -3100,7 +3100,7 @@ public abstract class ConsumerCoordinatorTest {
assertEquals(Collections.emptySet(),
subscriptions.initializingPartitions());
assertFalse(subscriptions.hasAllFetchPositions());
assertTrue(subscriptions.awaitingValidation(t1p));
- assertEquals(subscriptions.position(t1p).offset, 100L);
+ assertEquals(100L, subscriptions.position(t1p).offset);
assertNull(subscriptions.validPosition(t1p));
}
@@ -3470,7 +3470,7 @@ public abstract class ConsumerCoordinatorTest {
assertThrows(FencedInstanceIdException.class,
this::receiveFencedInstanceIdException);
assertThrows(FencedInstanceIdException.class, () ->
coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), new MockCommitCallback()));
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
assertThrows(FencedInstanceIdException.class, () ->
coordinator.commitOffsetsSync(singletonMap(t1p, new
OffsetAndMetadata(100L)), time.timer(Long.MAX_VALUE)));
}
@@ -3739,7 +3739,7 @@ public abstract class ConsumerCoordinatorTest {
prepareOffsetCommitRequest(singletonMap(t1p, 100L),
Errors.FENCED_INSTANCE_ID);
coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), new MockCommitCallback());
- assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+ assertEquals(0, coordinator.inFlightAsyncCommits.get());
coordinator.invokeCompletedOffsetCommitCallbacks();
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index f806ab65b6b..1378e4b53a1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -1815,7 +1815,7 @@ public class FetchRequestManagerTest {
assertEquals(1, oorExceptions.size());
OffsetOutOfRangeException oor = oorExceptions.get(0);
assertTrue(oor.offsetOutOfRangePartitions().containsKey(tp0));
- assertEquals(oor.offsetOutOfRangePartitions().size(), 1);
+ assertEquals(1, oor.offsetOutOfRangePartitions().size());
fetchRecordsInto(fetchedRecords);
@@ -2359,7 +2359,7 @@ public class FetchRequestManagerTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords = fetchRecords();
assertTrue(fetchedRecords.containsKey(tp0));
- assertEquals(fetchedRecords.get(tp0).size(), 2);
+ assertEquals(2, fetchedRecords.get(tp0).size());
}
@Test
@@ -2477,7 +2477,7 @@ public class FetchRequestManagerTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords = fetchRecords();
assertTrue(fetchedRecords.containsKey(tp0));
- assertEquals(fetchedRecords.get(tp0).size(), 2);
+ assertEquals(2, fetchedRecords.get(tp0).size());
List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords =
fetchedRecords.get(tp0);
Set<String> expectedCommittedKeys = Set.of("commit1-1", "commit1-2");
Set<String> actuallyCommittedKeys = new HashSet<>();
@@ -2854,7 +2854,7 @@ public class FetchRequestManagerTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords = fetchRecords();
assertTrue(fetchedRecords.containsKey(tp0));
- assertEquals(fetchedRecords.get(tp0).size(), 2);
+ assertEquals(2, fetchedRecords.get(tp0).size());
}
private MemoryRecords buildRecords(long baseOffset, int count, long
firstMessageId) {
@@ -2939,8 +2939,8 @@ public class FetchRequestManagerTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
- assertEquals(subscriptions.position(tp0).offset, 3L);
- assertOptional(subscriptions.position(tp0).offsetEpoch, value ->
assertEquals(value.intValue(), 1));
+ assertEquals(3L, subscriptions.position(tp0).offset);
+ assertOptional(subscriptions.position(tp0).offsetEpoch, value ->
assertEquals(1, value.intValue()));
}
@Test
@@ -3110,7 +3110,7 @@ public class FetchRequestManagerTest {
fetchRecords();
Node selected = fetcher.selectReadReplica(tp0, Node.noNode(),
time.milliseconds());
- assertEquals(selected.id(), 1);
+ assertEquals(1, selected.id());
assertEquals(1, sendFetches());
assertFalse(fetcher.hasCompletedFetches());
@@ -3124,7 +3124,7 @@ public class FetchRequestManagerTest {
fetchRecords();
selected = fetcher.selectReadReplica(tp0, Node.noNode(),
time.milliseconds());
- assertEquals(selected.id(), -1);
+ assertEquals(-1, selected.id());
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b85daebb8d8..ee051a42ca8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1802,7 +1802,7 @@ public class FetcherTest {
assertEquals(1, oorExceptions.size());
OffsetOutOfRangeException oor = oorExceptions.get(0);
assertTrue(oor.offsetOutOfRangePartitions().containsKey(tp0));
- assertEquals(oor.offsetOutOfRangePartitions().size(), 1);
+ assertEquals(1, oor.offsetOutOfRangePartitions().size());
fetchRecordsInto(fetchedRecords);
@@ -2346,7 +2346,7 @@ public class FetcherTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords = fetchRecords();
assertTrue(fetchedRecords.containsKey(tp0));
- assertEquals(fetchedRecords.get(tp0).size(), 2);
+ assertEquals(2, fetchedRecords.get(tp0).size());
}
@Test
@@ -2464,7 +2464,7 @@ public class FetcherTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords = fetchRecords();
assertTrue(fetchedRecords.containsKey(tp0));
- assertEquals(fetchedRecords.get(tp0).size(), 2);
+ assertEquals(2, fetchedRecords.get(tp0).size());
List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords =
fetchedRecords.get(tp0);
Set<String> expectedCommittedKeys = Set.of("commit1-1", "commit1-2");
Set<String> actuallyCommittedKeys = new HashSet<>();
@@ -3054,7 +3054,7 @@ public class FetcherTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords = fetchRecords();
assertTrue(fetchedRecords.containsKey(tp0));
- assertEquals(fetchedRecords.get(tp0).size(), 2);
+ assertEquals(2, fetchedRecords.get(tp0).size());
}
private MemoryRecords buildRecords(long baseOffset, int count, long
firstMessageId) {
@@ -3139,8 +3139,8 @@ public class FetcherTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
- assertEquals(subscriptions.position(tp0).offset, 3L);
- assertOptional(subscriptions.position(tp0).offsetEpoch, value ->
assertEquals(value.intValue(), 1));
+ assertEquals(3L, subscriptions.position(tp0).offset);
+ assertOptional(subscriptions.position(tp0).offsetEpoch, value ->
assertEquals(1, value.intValue()));
}
@Test
@@ -3217,8 +3217,8 @@ public class FetcherTest {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
- assertEquals(subscriptions.position(tp0).offset, 3L);
- assertOptional(subscriptions.position(tp0).offsetEpoch, value ->
assertEquals(value.intValue(), 1));
+ assertEquals(3L, subscriptions.position(tp0).offset);
+ assertOptional(subscriptions.position(tp0).offsetEpoch, value ->
assertEquals(1, value.intValue()));
}
@Test
@@ -3388,7 +3388,7 @@ public class FetcherTest {
fetchRecords();
Node selected = fetcher.selectReadReplica(tp0, Node.noNode(),
time.milliseconds());
- assertEquals(selected.id(), 1);
+ assertEquals(1, selected.id());
assertEquals(1, sendFetches());
assertFalse(fetcher.hasCompletedFetches());
@@ -3402,7 +3402,7 @@ public class FetcherTest {
fetchRecords();
selected = fetcher.selectReadReplica(tp0, Node.noNode(),
time.milliseconds());
- assertEquals(selected.id(), -1);
+ assertEquals(-1, selected.id());
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
index 22aa33098ee..7fa9f7e31f1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
@@ -99,8 +99,8 @@ class KafkaConsumerMetricsTest {
private void assertMetricValue(final String name) {
assertEquals(
- metrics.metric(metrics.metricName(name,
CONSUMER_METRIC_GROUP)).metricValue(),
- (double) METRIC_VALUE
+ (double) METRIC_VALUE,
+ metrics.metric(metrics.metricName(name,
CONSUMER_METRIC_GROUP)).metricValue()
);
}
}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index 3b7fe70ea4c..182900c0207 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -246,7 +246,7 @@ public class OffsetFetcherTest {
assertTrue(subscriptions.hasValidPosition(tp0));
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
assertTrue(subscriptions.isFetchable(tp0));
- assertEquals(subscriptions.position(tp0).offset, 5L);
+ assertEquals(5L, subscriptions.position(tp0).offset);
}
@Test
@@ -395,7 +395,7 @@ public class OffsetFetcherTest {
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
assertTrue(metadata.updateRequested());
- assertOptional(metadata.lastSeenLeaderEpoch(tp0), epoch ->
assertEquals((long) epoch, 2));
+ assertOptional(metadata.lastSeenLeaderEpoch(tp0), epoch ->
assertEquals(2, (long) epoch));
}
@Test
@@ -902,7 +902,7 @@ public class OffsetFetcherTest {
ListOffsetsRequest offsetRequest = (ListOffsetsRequest) body;
int epoch =
offsetRequest.topics().get(0).partitions().get(0).currentLeaderEpoch();
assertTrue(epoch != ListOffsetsResponse.UNKNOWN_EPOCH,
"Expected Fetcher to set leader epoch in request");
- assertEquals(epoch, 99, "Expected leader epoch to match epoch
from metadata update");
+ assertEquals(99, epoch, "Expected leader epoch to match epoch
from metadata update");
return true;
} else {
fail("Should have seen ListOffsetRequest");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
index a48b32b43ef..8a3617d61c7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
@@ -102,8 +102,8 @@ public class OffsetForLeaderEpochClientTest {
assertTrue(result.partitionsToRetry().isEmpty());
assertTrue(result.endOffsets().containsKey(tp0));
assertEquals(result.endOffsets().get(tp0).errorCode(),
Errors.NONE.code());
- assertEquals(result.endOffsets().get(tp0).leaderEpoch(), 1);
- assertEquals(result.endOffsets().get(tp0).endOffset(), 10L);
+ assertEquals(1, result.endOffsets().get(tp0).leaderEpoch());
+ assertEquals(10L, result.endOffsets().get(tp0).endOffset());
}
@Test
@@ -121,7 +121,7 @@ public class OffsetForLeaderEpochClientTest {
consumerClient.pollNoWakeup();
assertTrue(future.failed());
- assertEquals(future.exception().getClass(),
TopicAuthorizationException.class);
+ assertEquals(TopicAuthorizationException.class,
future.exception().getClass());
assertTrue(((TopicAuthorizationException)
future.exception()).unauthorizedTopics().contains(tp0.topic()));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 15e23875562..f4a2726b9e5 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -439,14 +439,10 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
- });
+ (mock, context) ->
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs));
final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(
Timer.class,
- (mock, context) -> {
- when(mock.isExpired()).thenReturn(true);
- });
+ (mock, context) -> when(mock.isExpired()).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
) {
@@ -473,14 +469,10 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs);
- });
+ (mock, context) ->
when(mock.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(timeToNextHeartbeatMs));
final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(
Timer.class,
- (mock, context) -> {
- when(mock.isExpired()).thenReturn(true);
- });
+ (mock, context) -> when(mock.isExpired()).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
) {
@@ -508,14 +500,10 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(
Timer.class,
- (mock, context) -> {
- when(mock.isExpired()).thenReturn(true);
- })
+ (mock, context) -> when(mock.isExpired()).thenReturn(true))
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
final HeartbeatRequestState heartbeatRequestState =
heartbeatRequestStateMockedConstruction.constructed().get(0);
@@ -551,9 +539,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- })
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true))
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
final HeartbeatRequestState heartbeatRequestState =
heartbeatRequestStateMockedConstruction.constructed().get(0);
@@ -1001,9 +987,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
) {
@@ -1032,9 +1016,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
) {
@@ -1073,9 +1055,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
) {
@@ -1111,9 +1091,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
) {
@@ -1145,9 +1123,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
) {
@@ -1173,9 +1149,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
final LogCaptureAppender logAppender =
LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
@@ -1212,9 +1186,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
final LogCaptureAppender logAppender =
LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
@@ -1261,9 +1233,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
final LogCaptureAppender logAppender =
LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
@@ -1312,9 +1282,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
) {
@@ -1343,9 +1311,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- });
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
final
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState>
heartbeatStateMockedConstruction = mockConstruction(
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
final LogCaptureAppender logAppender =
LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
@@ -1424,14 +1390,11 @@ class StreamsGroupHeartbeatRequestManagerTest {
@Test
public void testMaximumTimeToWaitPollTimerExpired() {
try (
- final MockedConstruction<Timer> timerMockedConstruction =
mockConstruction(Timer.class, (mock, context) -> {
- when(mock.isExpired()).thenReturn(true);
- });
+ final MockedConstruction<Timer> timerMockedConstruction =
+ mockConstruction(Timer.class, (mock, context) ->
when(mock.isExpired()).thenReturn(true));
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
- when(mock.requestInFlight()).thenReturn(false);
- })
+ (mock, context) ->
when(mock.requestInFlight()).thenReturn(false))
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
final Timer pollTimer =
timerMockedConstruction.constructed().get(0);
@@ -1450,9 +1413,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
final MockedConstruction<Timer> timerMockedConstruction =
mockConstruction(Timer.class);
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
- when(mock.requestInFlight()).thenReturn(false);
- })
+ (mock, context) ->
when(mock.requestInFlight()).thenReturn(false))
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
final Timer pollTimer =
timerMockedConstruction.constructed().get(0);
@@ -1473,9 +1434,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
final long remainingMs = 12L;
final long timeToNextHeartbeatMs = 6L;
try (
- final MockedConstruction<Timer> timerMockedConstruction =
mockConstruction(Timer.class, (mock, context) -> {
- when(mock.remainingMs()).thenReturn(remainingMs);
- });
+ final MockedConstruction<Timer> timerMockedConstruction =
+ mockConstruction(Timer.class, (mock, context) ->
when(mock.remainingMs()).thenReturn(remainingMs));
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
(mock, context) -> {
@@ -1500,14 +1460,11 @@ class StreamsGroupHeartbeatRequestManagerTest {
public void testMaximumTimeToWaitSelectingMinimumWaitTime(final long
remainingMs,
final long
timeToNextHeartbeatMs) {
try (
- final MockedConstruction<Timer> timerMockedConstruction =
mockConstruction(Timer.class, (mock, context) -> {
- when(mock.remainingMs()).thenReturn(remainingMs);
- });
+ final MockedConstruction<Timer> timerMockedConstruction =
+ mockConstruction(Timer.class, (mock, context) ->
when(mock.remainingMs()).thenReturn(remainingMs));
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
- (mock, context) -> {
-
when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs);
- })
+ (mock, context) ->
when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs))
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
final Timer pollTimer =
timerMockedConstruction.constructed().get(0);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
index 9607a0e9e20..606ba0b7350 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
@@ -62,7 +62,7 @@ public class StreamsRebalanceDataTest {
public void testTaskIdCompareTo() {
final StreamsRebalanceData.TaskId task = new
StreamsRebalanceData.TaskId("subtopologyId1", 1);
- assertTrue(task.compareTo(new
StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId())) == 0);
+ assertEquals(0, task.compareTo(new
StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId())));
assertTrue(task.compareTo(new
StreamsRebalanceData.TaskId(task.subtopologyId() + "1", task.partitionId())) <
0);
assertTrue(task.compareTo(new
StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId() + 1)) < 0);
assertTrue(new StreamsRebalanceData.TaskId(task.subtopologyId() + "1",
task.partitionId()).compareTo(task) > 0);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
index d6dd8c30caa..876bc3ffa12 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
@@ -113,13 +113,13 @@ public class AsyncConsumerMetricsTest {
// Then:
assertEquals(
+ (double) 10,
metrics.metric(
metrics.metricName(
"application-event-queue-size",
groupName
)
- ).metricValue(),
- (double) 10
+ ).metricValue()
);
}
@@ -156,13 +156,13 @@ public class AsyncConsumerMetricsTest {
// Then:
assertEquals(
+ (double) 10,
metrics.metric(
metrics.metricName(
"unsent-requests-queue-size",
groupName
)
- ).metricValue(),
- (double) 10
+ ).metricValue()
);
}
@@ -187,13 +187,13 @@ public class AsyncConsumerMetricsTest {
// Then:
assertEquals(
+ (double) 10,
metrics.metric(
metrics.metricName(
"background-event-queue-size",
groupName
)
- ).metricValue(),
- (double) 10
+ ).metricValue()
);
}
@@ -223,13 +223,13 @@ public class AsyncConsumerMetricsTest {
private void assertMetricValue(final String name, final String groupName) {
assertEquals(
+ (double) METRIC_VALUE,
metrics.metric(
metrics.metricName(
name,
groupName
)
- ).metricValue(),
- (double) METRIC_VALUE
+ ).metricValue()
);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 495578bad92..e66dcca5044 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -724,10 +724,10 @@ public class MockProducerTest {
buildMockProducer(false);
Future<RecordMetadata> metadata = producer.send(record2, (md,
exception) -> {
assertNotNull(md);
- assertEquals(md.offset(), -1L, "Invalid offset");
- assertEquals(md.timestamp(), RecordBatch.NO_TIMESTAMP, "Invalid
timestamp");
- assertEquals(md.serializedKeySize(), -1L, "Invalid Serialized Key
size");
- assertEquals(md.serializedValueSize(), -1L, "Invalid Serialized
value size");
+ assertEquals(-1L, md.offset(), "Invalid offset");
+ assertEquals(RecordBatch.NO_TIMESTAMP, md.timestamp(), "Invalid
timestamp");
+ assertEquals(-1L, md.serializedKeySize(), "Invalid Serialized Key
size");
+ assertEquals(-1L, md.serializedValueSize(), "Invalid Serialized
value size");
});
IllegalArgumentException e = new IllegalArgumentException("dummy
exception");
assertTrue(producer.errorNext(e), "Complete the second request with an
error");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 128e15ed6c6..727368e8edd 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -219,7 +219,7 @@ public class BufferPoolTest {
t1.join();
t2.join();
// both the allocate() called by threads t1 and t2 should have been
interrupted and the waiters queue should be empty
- assertEquals(pool.queued(), 0);
+ assertEquals(0, pool.queued());
}
@Test
@@ -332,7 +332,7 @@ public class BufferPoolTest {
}
- assertEquals(bufferPool.availableMemory(), 1024);
+ assertEquals(1024, bufferPool.availableMemory());
}
public static class StressTestThread extends Thread {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
index 46d1ed329ee..383aa82ee2d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
@@ -121,8 +121,8 @@ class KafkaProducerMetricsTest {
private void assertMetricValue(final String name) {
assertEquals(
- metrics.metric(metrics.metricName(name,
KafkaProducerMetrics.GROUP)).metricValue(),
- (double) METRIC_VALUE
+ (double) METRIC_VALUE,
+ metrics.metric(metrics.metricName(name,
KafkaProducerMetrics.GROUP)).metricValue()
);
}
}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 17d8676df1b..750440d2595 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -892,7 +892,7 @@ public class RecordAccumulatorTest {
readyNodes = accum.ready(metadataCache,
time.milliseconds()).readyNodes;
assertEquals(Collections.singleton(node1), readyNodes, "Our
partition's leader should be ready");
Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache,
readyNodes, Integer.MAX_VALUE, time.milliseconds());
- assertEquals(drained.get(node1.id()).size(), 1, "There should be only
one batch.");
+ assertEquals(1, drained.get(node1.id()).size(), "There should be only
one batch.");
time.sleep(1000L);
accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds());
@@ -1788,6 +1788,6 @@ public class RecordAccumulatorTest {
}
// Verify all original records are accounted for (no data loss)
- assertEquals(keyFoundMap.size(), 100, "All original 100 records should
be present after splitting");
+ assertEquals(100, keyFoundMap.size(), "All original 100 records should
be present after splitting");
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 6b2d50a52cc..cd984ac2a34 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -524,7 +524,7 @@ public class SenderTest {
// Verify node is throttled a little bit. In real-life Apache
Kafka, we observe that this can happen
// as done above by throttling or with a disconnect / backoff.
long currentPollDelay = client.pollDelayMs(nodeToThrottle,
startTime);
- assertEquals(currentPollDelay, throttleTimeMs);
+ assertEquals(throttleTimeMs, currentPollDelay);
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
@@ -3268,7 +3268,7 @@ public class SenderTest {
fail("Expected abortable error to be thrown for commit");
} catch (KafkaException e) {
assertTrue(transactionManager.hasAbortableError());
- assertEquals(commitResult.error().getClass(),
TransactionAbortableException.class);
+ assertEquals(TransactionAbortableException.class,
commitResult.error().getClass());
}
// Abort API with TRANSACTION_ABORTABLE error should convert to Fatal
error i.e. KafkaException
@@ -3287,7 +3287,7 @@ public class SenderTest {
// Verify TM is in FATAL_ERROR state
assertTrue(transactionManager.hasFatalError());
assertFalse(e instanceof TransactionAbortableException);
- assertEquals(abortResult.error().getClass(), KafkaException.class);
+ assertEquals(KafkaException.class, abortResult.error().getClass());
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 494c715df79..7815b751d80 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -623,9 +623,9 @@ public class TransactionManagerTest {
@ValueSource(booleans = {true, false})
public void testDefaultSequenceNumber(boolean transactionV2Enabled) {
initializeTransactionManager(Optional.empty(), transactionV2Enabled);
- assertEquals(transactionManager.sequenceNumber(tp0), 0);
+ assertEquals(0, transactionManager.sequenceNumber(tp0));
transactionManager.incrementSequenceNumber(tp0, 3);
- assertEquals(transactionManager.sequenceNumber(tp0), 3);
+ assertEquals(3, transactionManager.sequenceNumber(tp0));
}
@ParameterizedTest
@@ -849,13 +849,13 @@ public class TransactionManagerTest {
@ValueSource(booleans = {true, false})
public void testSequenceNumberOverflow(boolean transactionV2Enabled) {
initializeTransactionManager(Optional.empty(), transactionV2Enabled);
- assertEquals(transactionManager.sequenceNumber(tp0), 0);
+ assertEquals(0, transactionManager.sequenceNumber(tp0));
transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE);
- assertEquals(transactionManager.sequenceNumber(tp0),
Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE,
transactionManager.sequenceNumber(tp0));
transactionManager.incrementSequenceNumber(tp0, 100);
- assertEquals(transactionManager.sequenceNumber(tp0), 99);
+ assertEquals(99, transactionManager.sequenceNumber(tp0));
transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE);
- assertEquals(transactionManager.sequenceNumber(tp0), 98);
+ assertEquals(98, transactionManager.sequenceNumber(tp0));
}
@ParameterizedTest
@@ -863,17 +863,17 @@ public class TransactionManagerTest {
public void testProducerIdReset(boolean transactionV2Enabled) {
initializeTransactionManager(Optional.empty(), transactionV2Enabled);
initializeIdempotentProducerId(15L, Short.MAX_VALUE);
- assertEquals(transactionManager.sequenceNumber(tp0), 0);
- assertEquals(transactionManager.sequenceNumber(tp1), 0);
+ assertEquals(0, transactionManager.sequenceNumber(tp0));
+ assertEquals(0, transactionManager.sequenceNumber(tp1));
transactionManager.incrementSequenceNumber(tp0, 3);
- assertEquals(transactionManager.sequenceNumber(tp0), 3);
+ assertEquals(3, transactionManager.sequenceNumber(tp0));
transactionManager.incrementSequenceNumber(tp1, 3);
- assertEquals(transactionManager.sequenceNumber(tp1), 3);
+ assertEquals(3, transactionManager.sequenceNumber(tp1));
transactionManager.requestIdempotentEpochBumpForPartition(tp0);
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
- assertEquals(transactionManager.sequenceNumber(tp0), 0);
- assertEquals(transactionManager.sequenceNumber(tp1), 3);
+ assertEquals(0, transactionManager.sequenceNumber(tp0));
+ assertEquals(3, transactionManager.sequenceNumber(tp1));
}
@Test
@@ -1101,7 +1101,7 @@ public class TransactionManagerTest {
transactionManager.initializeTransactions(false);
client.prepareUnsupportedVersionResponse(body -> {
FindCoordinatorRequest findCoordinatorRequest =
(FindCoordinatorRequest) body;
-
assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()),
CoordinatorType.TRANSACTION);
+ assertEquals(CoordinatorType.TRANSACTION,
CoordinatorType.forId(findCoordinatorRequest.data().keyType()));
assertTrue(findCoordinatorRequest.data().key().isEmpty());
assertEquals(1,
findCoordinatorRequest.data().coordinatorKeys().size());
assertTrue(findCoordinatorRequest.data().coordinatorKeys().contains(transactionalId));
diff --git
a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
index 1d6679e62ce..9bc6f05106e 100644
---
a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
@@ -130,7 +130,7 @@ public class SupportedVersionRangeTest {
public void testEquals() {
SupportedVersionRange tested = new SupportedVersionRange((short) 1,
(short) 1);
assertEquals(tested, tested);
- assertNotEquals(tested, new SupportedVersionRange((short) 1, (short)
2));
+ assertNotEquals(new SupportedVersionRange((short) 1, (short) 2),
tested);
assertNotEquals(null, tested);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 4526b42e1aa..eda6648068c 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -684,8 +684,8 @@ public class MetricsTest {
MetricName inheritedMetric =
inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS,
childTagsWithValues);
Map<String, String> filledOutTags = inheritedMetric.tags();
- assertEquals(filledOutTags.get("parent-tag"), "parent-tag-value",
"parent-tag should be set properly");
- assertEquals(filledOutTags.get("child-tag"), "child-tag-value",
"child-tag should be set properly");
+ assertEquals("parent-tag-value", filledOutTags.get("parent-tag"),
"parent-tag should be set properly");
+ assertEquals("child-tag-value", filledOutTags.get("child-tag"),
"child-tag should be set properly");
assertThrows(IllegalArgumentException.class,
() ->
inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS,
parentTagsWithValues),
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index 5cdcebc858d..6b806c6bb7b 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -70,12 +70,12 @@ public class SensorTest {
assertTrue(Sensor.RecordingLevel.DEBUG.shouldRecord(configLevel.id));
assertTrue(Sensor.RecordingLevel.TRACE.shouldRecord(configLevel.id));
-
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.DEBUG.toString()),
- Sensor.RecordingLevel.DEBUG);
-
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.INFO.toString()),
- Sensor.RecordingLevel.INFO);
-
assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.TRACE.toString()),
- Sensor.RecordingLevel.TRACE);
+ assertEquals(Sensor.RecordingLevel.DEBUG,
+
Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.DEBUG.toString()));
+ assertEquals(Sensor.RecordingLevel.INFO,
+
Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.INFO.toString()));
+ assertEquals(Sensor.RecordingLevel.TRACE,
+
Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.TRACE.toString()));
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
index 7450b155996..e63d1949c8a 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
@@ -84,9 +84,9 @@ public class UpdateFeaturesRequestTest {
request = UpdateFeaturesRequest.parse(readable,
UpdateFeaturesRequestData.LOWEST_SUPPORTED_VERSION);
List<UpdateFeaturesRequest.FeatureUpdateItem> updates = new
ArrayList<>(request.featureUpdates());
- assertEquals(updates.size(), 2);
- assertEquals(updates.get(0).upgradeType(),
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE);
- assertEquals(updates.get(1).upgradeType(),
FeatureUpdate.UpgradeType.UPGRADE);
+ assertEquals(2, updates.size());
+ assertEquals(FeatureUpdate.UpgradeType.SAFE_DOWNGRADE,
updates.get(0).upgradeType());
+ assertEquals(FeatureUpdate.UpgradeType.UPGRADE,
updates.get(1).upgradeType());
}
@Test
@@ -114,9 +114,9 @@ public class UpdateFeaturesRequestTest {
request = UpdateFeaturesRequest.parse(readable,
UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION);
List<UpdateFeaturesRequest.FeatureUpdateItem> updates = new
ArrayList<>(request.featureUpdates());
- assertEquals(updates.size(), 2);
- assertEquals(updates.get(0).upgradeType(),
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE);
- assertEquals(updates.get(1).upgradeType(),
FeatureUpdate.UpgradeType.UPGRADE);
+ assertEquals(2, updates.size());
+ assertEquals(FeatureUpdate.UpgradeType.SAFE_DOWNGRADE,
updates.get(0).upgradeType());
+ assertEquals(FeatureUpdate.UpgradeType.UPGRADE,
updates.get(1).upgradeType());
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 76c9109bd8c..710caeb150a 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -292,7 +292,7 @@ public class SaslServerAuthenticatorTest {
when(saslServer.isComplete()).thenReturn(false).thenReturn(true);
mockRequest(saslAuthenticateRequest(), transportLayer);
- Throwable t = assertThrows(IllegalArgumentException.class, () ->
authenticator.authenticate());
+ Throwable t = assertThrows(IllegalArgumentException.class,
authenticator::authenticate);
assertEquals(ArithmeticException.class, t.getCause().getClass());
assertEquals("Cannot convert " + Long.MAX_VALUE + " millisecond to
nanosecond due to arithmetic overflow",
t.getMessage());
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosRuleTest.java
b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosRuleTest.java
index 5980a0d3b3c..31c01849bc7 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosRuleTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosRuleTest.java
@@ -26,12 +26,12 @@ public class KerberosRuleTest {
@Test
public void testReplaceParameters() throws BadFormatString {
// positive test cases
- assertEquals(KerberosRule.replaceParameters("", new String[0]), "");
- assertEquals(KerberosRule.replaceParameters("hello", new String[0]),
"hello");
- assertEquals(KerberosRule.replaceParameters("", new String[]{"too",
"many", "parameters", "are", "ok"}), "");
- assertEquals(KerberosRule.replaceParameters("hello", new
String[]{"too", "many", "parameters", "are", "ok"}), "hello");
- assertEquals(KerberosRule.replaceParameters("hello $0", new
String[]{"too", "many", "parameters", "are", "ok"}), "hello too");
- assertEquals(KerberosRule.replaceParameters("hello $0", new
String[]{"no recursion $1"}), "hello no recursion $1");
+ assertEquals("", KerberosRule.replaceParameters("", new String[0]));
+ assertEquals("hello", KerberosRule.replaceParameters("hello", new
String[0]));
+ assertEquals("", KerberosRule.replaceParameters("", new
String[]{"too", "many", "parameters", "are", "ok"}));
+ assertEquals("hello", KerberosRule.replaceParameters("hello", new
String[]{"too", "many", "parameters", "are", "ok"}));
+ assertEquals("hello too", KerberosRule.replaceParameters("hello $0",
new String[]{"too", "many", "parameters", "are", "ok"}));
+ assertEquals("hello no recursion $1",
KerberosRule.replaceParameters("hello $0", new String[]{"no recursion $1"}));
// negative test cases
assertThrows(
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
index abbe2ef28f9..89e6de42c1d 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
@@ -152,7 +152,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandlerTest {
private static void confirmCorrectValues(OAuthBearerUnsecuredJws jws,
String user, long startMs,
long lifetimeSeconds) throws OAuthBearerIllegalTokenException {
Map<String, Object> header = jws.header();
- assertEquals(header.size(), 1);
+ assertEquals(1, header.size());
assertEquals("none", header.get("alg"));
assertEquals(user != null ? user : "<unknown>", jws.principalName());
assertEquals(Long.valueOf(startMs), jws.startTimeMs());
diff --git
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index c06e853b073..935c02dbf83 100644
---
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -533,7 +533,7 @@ public class ClientTelemetryReporterTest {
assertEquals(ClientTelemetryState.PUSH_NEEDED,
telemetrySender.state());
- assertThrows(KafkaException.class, () ->
telemetrySender.createRequest());
+ assertThrows(KafkaException.class, telemetrySender::createRequest);
assertEquals(ClientTelemetryState.TERMINATED,
telemetrySender.state());
// === Test 3: After termination, no more requests ===
diff --git
a/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java
b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java
index 06d5a4e93eb..5a6d8b291b0 100644
---
a/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java
+++
b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigPolicyTest.java
@@ -38,8 +38,8 @@ public class AlterConfigPolicyTest {
assertEquals(requestMetadata, requestMetadata);
- assertNotEquals(requestMetadata, null);
- assertNotEquals(requestMetadata, new Object());
+ assertNotEquals(null, requestMetadata);
+ assertNotEquals(new Object(), requestMetadata);
assertNotEquals(requestMetadata, new RequestMetadata(
new ConfigResource(Type.BROKER, "1"),
Collections.singletonMap("foo", "bar")
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 47a26aa697d..889ebcbc607 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -444,14 +444,19 @@ public class TestSslUtils {
SubjectPublicKeyInfo subPubKeyInfo =
SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded());
BcContentSignerBuilder signerBuilder;
String keyAlgorithm = keyPair.getPublic().getAlgorithm();
- if (keyAlgorithm.equals("RSA"))
- signerBuilder = new BcRSAContentSignerBuilder(sigAlgId,
digAlgId);
- else if (keyAlgorithm.equals("DSA"))
- signerBuilder = new BcDSAContentSignerBuilder(sigAlgId,
digAlgId);
- else if (keyAlgorithm.equals("EC"))
- signerBuilder = new BcECContentSignerBuilder(sigAlgId,
digAlgId);
- else
- throw new IllegalArgumentException("Unsupported algorithm
" + keyAlgorithm);
+ switch (keyAlgorithm) {
+ case "RSA":
+ signerBuilder = new
BcRSAContentSignerBuilder(sigAlgId, digAlgId);
+ break;
+ case "DSA":
+ signerBuilder = new
BcDSAContentSignerBuilder(sigAlgId, digAlgId);
+ break;
+ case "EC":
+ signerBuilder = new BcECContentSignerBuilder(sigAlgId,
digAlgId);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported
algorithm " + keyAlgorithm);
+ }
ContentSigner sigGen =
signerBuilder.build(privateKeyAsymKeyParam);
// Negative numbers for "days" can be used to generate expired
certificates
Date now = new Date();
@@ -520,14 +525,19 @@ public class TestSslUtils {
SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded());
BcContentSignerBuilder signerBuilder;
String keyAlgorithm = keyPair.getPublic().getAlgorithm();
- if (keyAlgorithm.equals("RSA"))
- signerBuilder = new BcRSAContentSignerBuilder(sigAlgId,
digAlgId);
- else if (keyAlgorithm.equals("DSA"))
- signerBuilder = new BcDSAContentSignerBuilder(sigAlgId,
digAlgId);
- else if (keyAlgorithm.equals("EC"))
- signerBuilder = new BcECContentSignerBuilder(sigAlgId,
digAlgId);
- else
- throw new IllegalArgumentException("Unsupported algorithm
" + keyAlgorithm);
+ switch (keyAlgorithm) {
+ case "RSA":
+ signerBuilder = new
BcRSAContentSignerBuilder(sigAlgId, digAlgId);
+ break;
+ case "DSA":
+ signerBuilder = new
BcDSAContentSignerBuilder(sigAlgId, digAlgId);
+ break;
+ case "EC":
+ signerBuilder = new BcECContentSignerBuilder(sigAlgId,
digAlgId);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported
algorithm " + keyAlgorithm);
+ }
ContentSigner sigGen =
signerBuilder.build(privateKeyAsymKeyParam);
// Negative numbers for "days" can be used to generate expired
certificates
Date now = new Date();