snuyanzin commented on code in PR #19808:
URL: https://github.com/apache/flink/pull/19808#discussion_r881518844
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java:
##########
@@ -73,9 +72,9 @@ public void testPropertyHandling() {
.setKafkaProducerConfig(testConf)
.setProperty("k2", "correct"),
p -> {
- Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k,
p.containsKey(k)));
- assertEquals("v1", p.get("k1"));
- assertEquals("correct", p.get("k2"));
+ Arrays.stream(DEFAULT_KEYS).forEach(k ->
assertThat(p).containsKey(k));
+ assertThat(p.get("k1")).isEqualTo("v1");
+ assertThat(p.get("k2")).isEqualTo("correct");
Review Comment:
Here `containsEntry` could be used + joined
```suggestion
assertThat(p).containsEntry("k1",
"v1").containsEntry("k2", "correct");
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -161,8 +156,8 @@ public void testSerializeRecordWithKey() {
.setKeySerializationSchema(serializationSchema)
.build();
final ProducerRecord<byte[], byte[]> record = schema.serialize("a",
null, null);
- assertArrayEquals(record.key(), serializationSchema.serialize("a"));
- assertArrayEquals(record.value(), serializationSchema.serialize("a"));
+ assertThat(serializationSchema.serialize("a")).isEqualTo(record.key());
+
assertThat(serializationSchema.serialize("a")).isEqualTo(record.value());
Review Comment:
Could be joined as
```suggestion
assertThat(serializationSchema.serialize("a"))
.isEqualTo(record.key())
.isEqualTo(record.value());
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -157,26 +158,27 @@ public void
testMultiplePartitionsPerConsumersFixedPartitions() {
List<KafkaTopicPartition> initialDiscovery =
partitionDiscoverer.discoverPartitions();
- assertTrue(initialDiscovery.size() >=
minPartitionsPerConsumer);
- assertTrue(initialDiscovery.size() <=
maxPartitionsPerConsumer);
+ assertThat(initialDiscovery.size())
+ .isGreaterThanOrEqualTo(minPartitionsPerConsumer);
+
assertThat(initialDiscovery.size()).isLessThanOrEqualTo(maxPartitionsPerConsumer);
Review Comment:
```suggestion
assertThat(initialDiscovery)
.hasSizeGreaterThanOrEqualTo(minPartitionsPerConsumer)
.hasSizeLessThanOrEqualTo(maxPartitionsPerConsumer);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -507,15 +512,15 @@ public void testNonContiguousPartitionIdDiscovery()
throws Exception {
partitionDiscoverer.open();
List<KafkaTopicPartition> discoveredPartitions1 =
partitionDiscoverer.discoverPartitions();
- assertEquals(2, discoveredPartitions1.size());
- assertTrue(discoveredPartitions1.contains(new
KafkaTopicPartition("test-topic", 1)));
- assertTrue(discoveredPartitions1.contains(new
KafkaTopicPartition("test-topic", 4)));
+ assertThat(discoveredPartitions1).hasSize(2);
+ assertThat(discoveredPartitions1).contains(new
KafkaTopicPartition("test-topic", 1));
+ assertThat(discoveredPartitions1).contains(new
KafkaTopicPartition("test-topic", 4));
List<KafkaTopicPartition> discoveredPartitions2 =
partitionDiscoverer.discoverPartitions();
- assertEquals(3, discoveredPartitions2.size());
- assertTrue(discoveredPartitions2.contains(new
KafkaTopicPartition("test-topic", 0)));
- assertTrue(discoveredPartitions2.contains(new
KafkaTopicPartition("test-topic", 2)));
- assertTrue(discoveredPartitions2.contains(new
KafkaTopicPartition("test-topic", 3)));
+ assertThat(discoveredPartitions2).hasSize(3);
+ assertThat(discoveredPartitions2).contains(new
KafkaTopicPartition("test-topic", 0));
+ assertThat(discoveredPartitions2).contains(new
KafkaTopicPartition("test-topic", 2));
+ assertThat(discoveredPartitions2).contains(new
KafkaTopicPartition("test-topic", 3));
Review Comment:
```suggestion
assertThat(discoveredPartitions2)
.hasSize(3)
.contains(
new KafkaTopicPartition("test-topic", 0),
new KafkaTopicPartition("test-topic", 2),
new KafkaTopicPartition("test-topic", 3));
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -271,9 +270,9 @@ private void compareCompactedResult(
actualMap.computeIfAbsent(id, key -> new
ArrayList<>()).add(rowData);
}
- assertEquals(expected.size(), actualMap.size());
+ assertThat(actualMap).hasSize(expected.size());
Review Comment:
```suggestion
assertThat(actualMap).hasSameSizeAs(expected);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions()
throws Exception {
}
// assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() !=
null);
-
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
- assertEquals(
- expectedSubscribedPartitionsWithStartOffsets,
- consumerFunction.getSubscribedPartitionsToStartOffsets());
+
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+ .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
// the new partitions should have been considered as restored state
- assertTrue(consumerFunction.getRestoredState() != null);
-
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+ assertThat(consumerFunction.getRestoredState()).isNotNull();
+
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
for (Map.Entry<KafkaTopicPartition, Long> expectedEntry :
expectedSubscribedPartitionsWithStartOffsets.entrySet()) {
- assertEquals(
- expectedEntry.getValue(),
-
consumerFunction.getRestoredState().get(expectedEntry.getKey()));
+
assertThat(consumerFunction.getRestoredState().get(expectedEntry.getKey()))
+ .isEqualTo(expectedEntry.getValue());
Review Comment:
```suggestion
assertThat(consumerFunction.getRestoredState()).containsEntry(
expectedEntry.getKey(),
expectedEntry.getValue());
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -82,14 +82,16 @@ public void testKeyValueDeserializersSetIfMissing() throws
Exception {
new DummyFlinkKafkaProducer<>(
props, new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()), null);
-
assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-
assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
- assertTrue(
- props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
- .equals(ByteArraySerializer.class.getName()));
- assertTrue(
- props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
- .equals(ByteArraySerializer.class.getName()));
+
assertThat(props).containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+
assertThat(props).containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
Review Comment:
```suggestion
assertThat(props)
.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -220,25 +222,25 @@ public void
testPartitionsFewerThanConsumersFixedPartitions() {
List<KafkaTopicPartition> initialDiscovery =
partitionDiscoverer.discoverPartitions();
- assertTrue(initialDiscovery.size() <= 1);
+ assertThat(initialDiscovery.size()).isLessThanOrEqualTo(1);
Review Comment:
```suggestion
assertThat(initialDiscovery).hasSizeLessThanOrEqualTo(1);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -304,16 +298,15 @@ public void testKafkaClientProperties() throws Exception {
AdminClient adminClient =
(AdminClient) Whitebox.getInternalState(enumerator,
"adminClient");
- assertNotNull(adminClient);
+ assertThat(adminClient).isNotNull();
String clientId = (String) Whitebox.getInternalState(adminClient,
"clientId");
- assertNotNull(clientId);
- assertTrue(clientId.startsWith(clientIdPrefix));
- assertEquals(
- defaultTimeoutMs,
- Whitebox.getInternalState(adminClient,
"defaultApiTimeoutMs"));
-
- assertNotNull(clientId);
- assertTrue(clientId.startsWith(clientIdPrefix));
+ assertThat(clientId).isNotNull();
+ assertThat(clientId).startsWith(clientIdPrefix);
+ assertThat(Whitebox.getInternalState(adminClient,
"defaultApiTimeoutMs"))
+ .isEqualTo(defaultTimeoutMs);
+
+ assertThat(clientId).isNotNull();
+ assertThat(clientId).startsWith(clientIdPrefix);
Review Comment:
```suggestion
assertThat(clientId).isNotNull().startsWith(clientIdPrefix);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.earliest();
List<TopicPartition> partitions =
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets =
initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
- assertTrue(offsets.keySet().containsAll(partitions));
+ assertThat(offsets).hasSize(partitions.size());
+ assertThat(offsets.keySet().containsAll(partitions)).isTrue();
Review Comment:
```suggestion
assertThat(offsets).hasSameSizeAs(partitions);
assertThat(offsets.keySet()).containsAll(partitions);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.earliest();
List<TopicPartition> partitions =
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets =
initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
- assertTrue(offsets.keySet().containsAll(partitions));
+ assertThat(offsets).hasSize(partitions.size());
+ assertThat(offsets.keySet().containsAll(partitions)).isTrue();
for (long offset : offsets.values()) {
- Assert.assertEquals(KafkaPartitionSplit.EARLIEST_OFFSET, offset);
+ assertThat(offset).isEqualTo(KafkaPartitionSplit.EARLIEST_OFFSET);
}
- assertEquals(OffsetResetStrategy.EARLIEST,
initializer.getAutoOffsetResetStrategy());
+ assertThat(initializer.getAutoOffsetResetStrategy())
+ .isEqualTo(OffsetResetStrategy.EARLIEST);
}
@Test
public void testLatestOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.latest();
List<TopicPartition> partitions =
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets =
initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
- assertTrue(offsets.keySet().containsAll(partitions));
+ assertThat(offsets).hasSize(partitions.size());
+ assertThat(offsets.keySet().containsAll(partitions)).isTrue();
Review Comment:
```suggestion
assertThat(offsets).hasSameSizeAs(partitions);
assertThat(offsets.keySet()).containsAll(partitions);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -314,33 +316,39 @@ public void testGrowingPartitions() {
List<KafkaTopicPartition> initialDiscoverySubtask2 =
partitionDiscovererSubtask2.discoverPartitions();
- assertTrue(initialDiscoverySubtask0.size() >=
minInitialPartitionsPerConsumer);
- assertTrue(initialDiscoverySubtask0.size() <=
maxInitialPartitionsPerConsumer);
- assertTrue(initialDiscoverySubtask1.size() >=
minInitialPartitionsPerConsumer);
- assertTrue(initialDiscoverySubtask1.size() <=
maxInitialPartitionsPerConsumer);
- assertTrue(initialDiscoverySubtask2.size() >=
minInitialPartitionsPerConsumer);
- assertTrue(initialDiscoverySubtask2.size() <=
maxInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask0.size())
+ .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask0.size())
+ .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask1.size())
+ .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask1.size())
+ .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask2.size())
+ .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask2.size())
+ .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
Review Comment:
```suggestion
assertThat(initialDiscoverySubtask0)
.hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
.hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
assertThat(initialDiscoverySubtask1)
.hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
.hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
assertThat(initialDiscoverySubtask2)
.hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
.hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -304,16 +298,15 @@ public void testKafkaClientProperties() throws Exception {
AdminClient adminClient =
(AdminClient) Whitebox.getInternalState(enumerator,
"adminClient");
- assertNotNull(adminClient);
+ assertThat(adminClient).isNotNull();
String clientId = (String) Whitebox.getInternalState(adminClient,
"clientId");
- assertNotNull(clientId);
- assertTrue(clientId.startsWith(clientIdPrefix));
- assertEquals(
- defaultTimeoutMs,
- Whitebox.getInternalState(adminClient,
"defaultApiTimeoutMs"));
-
- assertNotNull(clientId);
- assertTrue(clientId.startsWith(clientIdPrefix));
+ assertThat(clientId).isNotNull();
+ assertThat(clientId).startsWith(clientIdPrefix);
Review Comment:
```suggestion
assertThat(clientId).isNotNull().startsWith(clientIdPrefix);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.earliest();
List<TopicPartition> partitions =
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets =
initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
- assertTrue(offsets.keySet().containsAll(partitions));
+ assertThat(offsets).hasSize(partitions.size());
+ assertThat(offsets.keySet().containsAll(partitions)).isTrue();
for (long offset : offsets.values()) {
- Assert.assertEquals(KafkaPartitionSplit.EARLIEST_OFFSET, offset);
+ assertThat(offset).isEqualTo(KafkaPartitionSplit.EARLIEST_OFFSET);
}
- assertEquals(OffsetResetStrategy.EARLIEST,
initializer.getAutoOffsetResetStrategy());
+ assertThat(initializer.getAutoOffsetResetStrategy())
+ .isEqualTo(OffsetResetStrategy.EARLIEST);
}
@Test
public void testLatestOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.latest();
List<TopicPartition> partitions =
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets =
initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
- assertTrue(offsets.keySet().containsAll(partitions));
+ assertThat(offsets).hasSize(partitions.size());
+ assertThat(offsets.keySet().containsAll(partitions)).isTrue();
for (long offset : offsets.values()) {
- assertEquals(KafkaPartitionSplit.LATEST_OFFSET, offset);
+ assertThat(offset).isEqualTo(KafkaPartitionSplit.LATEST_OFFSET);
}
- assertEquals(OffsetResetStrategy.LATEST,
initializer.getAutoOffsetResetStrategy());
+
assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST);
}
@Test
public void testCommittedGroupOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.committedOffsets();
List<TopicPartition> partitions =
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets =
initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
+ assertThat(offsets).hasSize(partitions.size());
Review Comment:
```suggestion
assertThat(offsets).hasSameSizeAs(partitions);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -222,11 +221,11 @@ public void testRestoreFromEmptyStateNoPartitions()
throws Exception {
testHarness.open();
// assert that no partitions were found and is empty
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() !=
null);
-
assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isEmpty();
Review Comment:
Under the hood `isEmpty` checks for `isNotNull` => could be omitted
```suggestion
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isEmpty();
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java:
##########
@@ -99,12 +99,12 @@ public static void comparedWithKeyAndOrder(
actualData.computeIfAbsent(key, k -> new LinkedList<>()).add(row);
}
// compare key first
- assertEquals("Actual result: " + actual, expectedData.size(),
actualData.size());
+ assertThat(actualData).as("Actual result: " +
actual).hasSize(expectedData.size());
Review Comment:
```suggestion
assertThat(actualData).as("Actual result: " +
actual).hasSameSizeAs(expectedData);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions()
throws Exception {
}
// assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() !=
null);
-
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
- assertEquals(
- expectedSubscribedPartitionsWithStartOffsets,
- consumerFunction.getSubscribedPartitionsToStartOffsets());
+
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+ .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
// the new partitions should have been considered as restored state
- assertTrue(consumerFunction.getRestoredState() != null);
-
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+ assertThat(consumerFunction.getRestoredState()).isNotNull();
+
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
Review Comment:
```suggestion
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotEmpty();
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions()
throws Exception {
}
// assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() !=
null);
-
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
- assertEquals(
- expectedSubscribedPartitionsWithStartOffsets,
- consumerFunction.getSubscribedPartitionsToStartOffsets());
+
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+ .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
Review Comment:
Under the hood `isNotEmpty()`, `isEmpty()` check for is not null, also could
be joined
```suggestion
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
.isNotEmpty()
.isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -507,15 +512,15 @@ public void testNonContiguousPartitionIdDiscovery()
throws Exception {
partitionDiscoverer.open();
List<KafkaTopicPartition> discoveredPartitions1 =
partitionDiscoverer.discoverPartitions();
- assertEquals(2, discoveredPartitions1.size());
- assertTrue(discoveredPartitions1.contains(new
KafkaTopicPartition("test-topic", 1)));
- assertTrue(discoveredPartitions1.contains(new
KafkaTopicPartition("test-topic", 4)));
+ assertThat(discoveredPartitions1).hasSize(2);
+ assertThat(discoveredPartitions1).contains(new
KafkaTopicPartition("test-topic", 1));
+ assertThat(discoveredPartitions1).contains(new
KafkaTopicPartition("test-topic", 4));
Review Comment:
```suggestion
assertThat(discoveredPartitions1)
.hasSize(2)
.contains(
new KafkaTopicPartition("test-topic", 1),
new KafkaTopicPartition("test-topic", 4));
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -325,15 +322,16 @@ public void testRestore() throws Exception {
testHarness.open();
// assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() !=
null);
-
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
// on restore, subscribedPartitionsToStartOffsets should be identical
to the restored state
- assertEquals(PARTITION_STATE,
consumerFunction.getSubscribedPartitionsToStartOffsets());
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+ .isEqualTo(PARTITION_STATE);
// assert that state is correctly restored from legacy checkpoint
- assertTrue(consumerFunction.getRestoredState() != null);
- assertEquals(PARTITION_STATE, consumerFunction.getRestoredState());
+ assertThat(consumerFunction.getRestoredState()).isNotNull();
+
assertThat(consumerFunction.getRestoredState()).isEqualTo(PARTITION_STATE);
Review Comment:
```suggestion
assertThat(consumerFunction.getRestoredState()).isNotNull().isEqualTo(PARTITION_STATE);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -325,15 +322,16 @@ public void testRestore() throws Exception {
testHarness.open();
// assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() !=
null);
-
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
// on restore, subscribedPartitionsToStartOffsets should be identical
to the restored state
- assertEquals(PARTITION_STATE,
consumerFunction.getSubscribedPartitionsToStartOffsets());
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+ .isEqualTo(PARTITION_STATE);
Review Comment:
All these lines could be joined in something like these
```java
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
.isNotEmpty()
// on restore, subscribedPartitionsToStartOffsets should be
identical to the restored state
.isEqualTo(PARTITION_STATE);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -82,14 +82,16 @@ public void testKeyValueDeserializersSetIfMissing() throws
Exception {
new DummyFlinkKafkaProducer<>(
props, new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()), null);
-
assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-
assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
- assertTrue(
- props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
- .equals(ByteArraySerializer.class.getName()));
- assertTrue(
- props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
- .equals(ByteArraySerializer.class.getName()));
+
assertThat(props).containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+
assertThat(props).containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ assertThat(
+
props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+ .equals(ByteArraySerializer.class.getName()))
+ .isTrue();
+ assertThat(
+
props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+ .equals(ByteArraySerializer.class.getName()))
+ .isTrue();
Review Comment:
Is it expected to have these lines twice?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]