snuyanzin commented on code in PR #19808:
URL: https://github.com/apache/flink/pull/19808#discussion_r881518844


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java:
##########
@@ -73,9 +72,9 @@ public void testPropertyHandling() {
                         .setKafkaProducerConfig(testConf)
                         .setProperty("k2", "correct"),
                 p -> {
-                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, 
p.containsKey(k)));
-                    assertEquals("v1", p.get("k1"));
-                    assertEquals("correct", p.get("k2"));
+                    Arrays.stream(DEFAULT_KEYS).forEach(k -> 
assertThat(p).containsKey(k));
+                    assertThat(p.get("k1")).isEqualTo("v1");
+                    assertThat(p.get("k2")).isEqualTo("correct");

Review Comment:
   Here `containsEntry` could be used + joined
   ```suggestion
                       assertThat(p).containsEntry("k1", 
"v1").containsEntry("k2", "correct");
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -161,8 +156,8 @@ public void testSerializeRecordWithKey() {
                         .setKeySerializationSchema(serializationSchema)
                         .build();
         final ProducerRecord<byte[], byte[]> record = schema.serialize("a", 
null, null);
-        assertArrayEquals(record.key(), serializationSchema.serialize("a"));
-        assertArrayEquals(record.value(), serializationSchema.serialize("a"));
+        assertThat(serializationSchema.serialize("a")).isEqualTo(record.key());
+        
assertThat(serializationSchema.serialize("a")).isEqualTo(record.value());

Review Comment:
   Could be joined as
   ```suggestion
           assertThat(serializationSchema.serialize("a"))
                   .isEqualTo(record.key())
                   .isEqualTo(record.value());
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -157,26 +158,27 @@ public void 
testMultiplePartitionsPerConsumersFixedPartitions() {
 
                 List<KafkaTopicPartition> initialDiscovery =
                         partitionDiscoverer.discoverPartitions();
-                assertTrue(initialDiscovery.size() >= 
minPartitionsPerConsumer);
-                assertTrue(initialDiscovery.size() <= 
maxPartitionsPerConsumer);
+                assertThat(initialDiscovery.size())
+                        .isGreaterThanOrEqualTo(minPartitionsPerConsumer);
+                
assertThat(initialDiscovery.size()).isLessThanOrEqualTo(maxPartitionsPerConsumer);

Review Comment:
   ```suggestion
                   assertThat(initialDiscovery)
                           
.hasSizeGreaterThanOrEqualTo(minPartitionsPerConsumer)
                           .hasSizeLessThanOrEqualTo(maxPartitionsPerConsumer);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -507,15 +512,15 @@ public void testNonContiguousPartitionIdDiscovery() 
throws Exception {
         partitionDiscoverer.open();
 
         List<KafkaTopicPartition> discoveredPartitions1 = 
partitionDiscoverer.discoverPartitions();
-        assertEquals(2, discoveredPartitions1.size());
-        assertTrue(discoveredPartitions1.contains(new 
KafkaTopicPartition("test-topic", 1)));
-        assertTrue(discoveredPartitions1.contains(new 
KafkaTopicPartition("test-topic", 4)));
+        assertThat(discoveredPartitions1).hasSize(2);
+        assertThat(discoveredPartitions1).contains(new 
KafkaTopicPartition("test-topic", 1));
+        assertThat(discoveredPartitions1).contains(new 
KafkaTopicPartition("test-topic", 4));
 
         List<KafkaTopicPartition> discoveredPartitions2 = 
partitionDiscoverer.discoverPartitions();
-        assertEquals(3, discoveredPartitions2.size());
-        assertTrue(discoveredPartitions2.contains(new 
KafkaTopicPartition("test-topic", 0)));
-        assertTrue(discoveredPartitions2.contains(new 
KafkaTopicPartition("test-topic", 2)));
-        assertTrue(discoveredPartitions2.contains(new 
KafkaTopicPartition("test-topic", 3)));
+        assertThat(discoveredPartitions2).hasSize(3);
+        assertThat(discoveredPartitions2).contains(new 
KafkaTopicPartition("test-topic", 0));
+        assertThat(discoveredPartitions2).contains(new 
KafkaTopicPartition("test-topic", 2));
+        assertThat(discoveredPartitions2).contains(new 
KafkaTopicPartition("test-topic", 3));

Review Comment:
   ```suggestion
           assertThat(discoveredPartitions2)
                   .hasSize(3)
                   .contains(
                           new KafkaTopicPartition("test-topic", 0),
                           new KafkaTopicPartition("test-topic", 2),
                           new KafkaTopicPartition("test-topic", 3));
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -271,9 +270,9 @@ private void compareCompactedResult(
             actualMap.computeIfAbsent(id, key -> new 
ArrayList<>()).add(rowData);
         }
 
-        assertEquals(expected.size(), actualMap.size());
+        assertThat(actualMap).hasSize(expected.size());

Review Comment:
   ```suggestion
           assertThat(actualMap).hasSameSizeAs(expected);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions() 
throws Exception {
         }
 
         // assert that there are partitions and is identical to expected list
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != 
null);
-        
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
-        assertEquals(
-                expectedSubscribedPartitionsWithStartOffsets,
-                consumerFunction.getSubscribedPartitionsToStartOffsets());
+        
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+                .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
 
         // the new partitions should have been considered as restored state
-        assertTrue(consumerFunction.getRestoredState() != null);
-        
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+        assertThat(consumerFunction.getRestoredState()).isNotNull();
+        
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
         for (Map.Entry<KafkaTopicPartition, Long> expectedEntry :
                 expectedSubscribedPartitionsWithStartOffsets.entrySet()) {
-            assertEquals(
-                    expectedEntry.getValue(),
-                    
consumerFunction.getRestoredState().get(expectedEntry.getKey()));
+            
assertThat(consumerFunction.getRestoredState().get(expectedEntry.getKey()))
+                    .isEqualTo(expectedEntry.getValue());

Review Comment:
   ```suggestion
               assertThat(consumerFunction.getRestoredState()).containsEntry(
                       expectedEntry.getKey(),
                       expectedEntry.getValue());
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -82,14 +82,16 @@ public void testKeyValueDeserializersSetIfMissing() throws 
Exception {
         new DummyFlinkKafkaProducer<>(
                 props, new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), null);
 
-        
assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-        
assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
-        assertTrue(
-                props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                        .equals(ByteArraySerializer.class.getName()));
-        assertTrue(
-                props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                        .equals(ByteArraySerializer.class.getName()));
+        
assertThat(props).containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+        
assertThat(props).containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);

Review Comment:
   ```suggestion
           assertThat(props)
                   .containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
                   .containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -220,25 +222,25 @@ public void 
testPartitionsFewerThanConsumersFixedPartitions() {
 
                 List<KafkaTopicPartition> initialDiscovery =
                         partitionDiscoverer.discoverPartitions();
-                assertTrue(initialDiscovery.size() <= 1);
+                assertThat(initialDiscovery.size()).isLessThanOrEqualTo(1);

Review Comment:
   ```suggestion
                   assertThat(initialDiscovery).hasSizeLessThanOrEqualTo(1);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -304,16 +298,15 @@ public void testKafkaClientProperties() throws Exception {
 
             AdminClient adminClient =
                     (AdminClient) Whitebox.getInternalState(enumerator, 
"adminClient");
-            assertNotNull(adminClient);
+            assertThat(adminClient).isNotNull();
             String clientId = (String) Whitebox.getInternalState(adminClient, 
"clientId");
-            assertNotNull(clientId);
-            assertTrue(clientId.startsWith(clientIdPrefix));
-            assertEquals(
-                    defaultTimeoutMs,
-                    Whitebox.getInternalState(adminClient, 
"defaultApiTimeoutMs"));
-
-            assertNotNull(clientId);
-            assertTrue(clientId.startsWith(clientIdPrefix));
+            assertThat(clientId).isNotNull();
+            assertThat(clientId).startsWith(clientIdPrefix);
+            assertThat(Whitebox.getInternalState(adminClient, 
"defaultApiTimeoutMs"))
+                    .isEqualTo(defaultTimeoutMs);
+
+            assertThat(clientId).isNotNull();
+            assertThat(clientId).startsWith(clientIdPrefix);

Review Comment:
   ```suggestion
               assertThat(clientId).isNotNull().startsWith(clientIdPrefix);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.earliest();
         List<TopicPartition> partitions = 
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = 
initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
-        assertTrue(offsets.keySet().containsAll(partitions));
+        assertThat(offsets).hasSize(partitions.size());
+        assertThat(offsets.keySet().containsAll(partitions)).isTrue();

Review Comment:
   ```suggestion
           assertThat(offsets).hasSameSizeAs(partitions);
           assertThat(offsets.keySet()).containsAll(partitions);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.earliest();
         List<TopicPartition> partitions = 
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = 
initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
-        assertTrue(offsets.keySet().containsAll(partitions));
+        assertThat(offsets).hasSize(partitions.size());
+        assertThat(offsets.keySet().containsAll(partitions)).isTrue();
         for (long offset : offsets.values()) {
-            Assert.assertEquals(KafkaPartitionSplit.EARLIEST_OFFSET, offset);
+            assertThat(offset).isEqualTo(KafkaPartitionSplit.EARLIEST_OFFSET);
         }
-        assertEquals(OffsetResetStrategy.EARLIEST, 
initializer.getAutoOffsetResetStrategy());
+        assertThat(initializer.getAutoOffsetResetStrategy())
+                .isEqualTo(OffsetResetStrategy.EARLIEST);
     }
 
     @Test
     public void testLatestOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.latest();
         List<TopicPartition> partitions = 
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = 
initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
-        assertTrue(offsets.keySet().containsAll(partitions));
+        assertThat(offsets).hasSize(partitions.size());
+        assertThat(offsets.keySet().containsAll(partitions)).isTrue();

Review Comment:
   ```suggestion
           assertThat(offsets).hasSameSizeAs(partitions);
           assertThat(offsets.keySet()).containsAll(partitions);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -314,33 +316,39 @@ public void testGrowingPartitions() {
             List<KafkaTopicPartition> initialDiscoverySubtask2 =
                     partitionDiscovererSubtask2.discoverPartitions();
 
-            assertTrue(initialDiscoverySubtask0.size() >= 
minInitialPartitionsPerConsumer);
-            assertTrue(initialDiscoverySubtask0.size() <= 
maxInitialPartitionsPerConsumer);
-            assertTrue(initialDiscoverySubtask1.size() >= 
minInitialPartitionsPerConsumer);
-            assertTrue(initialDiscoverySubtask1.size() <= 
maxInitialPartitionsPerConsumer);
-            assertTrue(initialDiscoverySubtask2.size() >= 
minInitialPartitionsPerConsumer);
-            assertTrue(initialDiscoverySubtask2.size() <= 
maxInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask0.size())
+                    .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask0.size())
+                    .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask1.size())
+                    .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask1.size())
+                    .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask2.size())
+                    .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask2.size())
+                    .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);

Review Comment:
   ```suggestion
               assertThat(initialDiscoverySubtask0)
                       
.hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
                       
.hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
               assertThat(initialDiscoverySubtask1)
                       
.hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
                       
.hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
               assertThat(initialDiscoverySubtask2)
                       
.hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
                       
.hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -304,16 +298,15 @@ public void testKafkaClientProperties() throws Exception {
 
             AdminClient adminClient =
                     (AdminClient) Whitebox.getInternalState(enumerator, 
"adminClient");
-            assertNotNull(adminClient);
+            assertThat(adminClient).isNotNull();
             String clientId = (String) Whitebox.getInternalState(adminClient, 
"clientId");
-            assertNotNull(clientId);
-            assertTrue(clientId.startsWith(clientIdPrefix));
-            assertEquals(
-                    defaultTimeoutMs,
-                    Whitebox.getInternalState(adminClient, 
"defaultApiTimeoutMs"));
-
-            assertNotNull(clientId);
-            assertTrue(clientId.startsWith(clientIdPrefix));
+            assertThat(clientId).isNotNull();
+            assertThat(clientId).startsWith(clientIdPrefix);

Review Comment:
   ```suggestion
               assertThat(clientId).isNotNull().startsWith(clientIdPrefix);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.earliest();
         List<TopicPartition> partitions = 
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = 
initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
-        assertTrue(offsets.keySet().containsAll(partitions));
+        assertThat(offsets).hasSize(partitions.size());
+        assertThat(offsets.keySet().containsAll(partitions)).isTrue();
         for (long offset : offsets.values()) {
-            Assert.assertEquals(KafkaPartitionSplit.EARLIEST_OFFSET, offset);
+            assertThat(offset).isEqualTo(KafkaPartitionSplit.EARLIEST_OFFSET);
         }
-        assertEquals(OffsetResetStrategy.EARLIEST, 
initializer.getAutoOffsetResetStrategy());
+        assertThat(initializer.getAutoOffsetResetStrategy())
+                .isEqualTo(OffsetResetStrategy.EARLIEST);
     }
 
     @Test
     public void testLatestOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.latest();
         List<TopicPartition> partitions = 
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = 
initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
-        assertTrue(offsets.keySet().containsAll(partitions));
+        assertThat(offsets).hasSize(partitions.size());
+        assertThat(offsets.keySet().containsAll(partitions)).isTrue();
         for (long offset : offsets.values()) {
-            assertEquals(KafkaPartitionSplit.LATEST_OFFSET, offset);
+            assertThat(offset).isEqualTo(KafkaPartitionSplit.LATEST_OFFSET);
         }
-        assertEquals(OffsetResetStrategy.LATEST, 
initializer.getAutoOffsetResetStrategy());
+        
assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST);
     }
 
     @Test
     public void testCommittedGroupOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.committedOffsets();
         List<TopicPartition> partitions = 
KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = 
initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
+        assertThat(offsets).hasSize(partitions.size());

Review Comment:
   ```suggestion
           assertThat(offsets).hasSameSizeAs(partitions);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -222,11 +221,11 @@ public void testRestoreFromEmptyStateNoPartitions() 
throws Exception {
         testHarness.open();
 
         // assert that no partitions were found and is empty
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != 
null);
-        
assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+        
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isEmpty();

Review Comment:
   Under the hood `isEmpty` checks for `isNotNull` => could be omitted
   ```suggestion
           
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isEmpty();
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java:
##########
@@ -99,12 +99,12 @@ public static void comparedWithKeyAndOrder(
             actualData.computeIfAbsent(key, k -> new LinkedList<>()).add(row);
         }
         // compare key first
-        assertEquals("Actual result: " + actual, expectedData.size(), 
actualData.size());
+        assertThat(actualData).as("Actual result: " + 
actual).hasSize(expectedData.size());

Review Comment:
   ```suggestion
           assertThat(actualData).as("Actual result: " + 
actual).hasSameSizeAs(expectedData);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions() 
throws Exception {
         }
 
         // assert that there are partitions and is identical to expected list
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != 
null);
-        
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
-        assertEquals(
-                expectedSubscribedPartitionsWithStartOffsets,
-                consumerFunction.getSubscribedPartitionsToStartOffsets());
+        
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+                .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
 
         // the new partitions should have been considered as restored state
-        assertTrue(consumerFunction.getRestoredState() != null);
-        
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+        assertThat(consumerFunction.getRestoredState()).isNotNull();
+        
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();

Review Comment:
   ```suggestion
           
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotEmpty();
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions() 
throws Exception {
         }
 
         // assert that there are partitions and is identical to expected list
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != 
null);
-        
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
-        assertEquals(
-                expectedSubscribedPartitionsWithStartOffsets,
-                consumerFunction.getSubscribedPartitionsToStartOffsets());
+        
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+                .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);

Review Comment:
   Under the hood `isNotEmpty()`, `isEmpty()` check for is not null, also could 
be joined
   ```suggestion
           assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
                   .isNotEmpty()
                   .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -507,15 +512,15 @@ public void testNonContiguousPartitionIdDiscovery() 
throws Exception {
         partitionDiscoverer.open();
 
         List<KafkaTopicPartition> discoveredPartitions1 = 
partitionDiscoverer.discoverPartitions();
-        assertEquals(2, discoveredPartitions1.size());
-        assertTrue(discoveredPartitions1.contains(new 
KafkaTopicPartition("test-topic", 1)));
-        assertTrue(discoveredPartitions1.contains(new 
KafkaTopicPartition("test-topic", 4)));
+        assertThat(discoveredPartitions1).hasSize(2);
+        assertThat(discoveredPartitions1).contains(new 
KafkaTopicPartition("test-topic", 1));
+        assertThat(discoveredPartitions1).contains(new 
KafkaTopicPartition("test-topic", 4));

Review Comment:
   ```suggestion
           assertThat(discoveredPartitions1)
                   .hasSize(2)
                   .contains(
                           new KafkaTopicPartition("test-topic", 1),
                           new KafkaTopicPartition("test-topic", 4));
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -325,15 +322,16 @@ public void testRestore() throws Exception {
         testHarness.open();
 
         // assert that there are partitions and is identical to expected list
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != 
null);
-        
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+        
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
 
         // on restore, subscribedPartitionsToStartOffsets should be identical 
to the restored state
-        assertEquals(PARTITION_STATE, 
consumerFunction.getSubscribedPartitionsToStartOffsets());
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+                .isEqualTo(PARTITION_STATE);
 
         // assert that state is correctly restored from legacy checkpoint
-        assertTrue(consumerFunction.getRestoredState() != null);
-        assertEquals(PARTITION_STATE, consumerFunction.getRestoredState());
+        assertThat(consumerFunction.getRestoredState()).isNotNull();
+        
assertThat(consumerFunction.getRestoredState()).isEqualTo(PARTITION_STATE);

Review Comment:
   ```suggestion
           
assertThat(consumerFunction.getRestoredState()).isNotNull().isEqualTo(PARTITION_STATE);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -325,15 +322,16 @@ public void testRestore() throws Exception {
         testHarness.open();
 
         // assert that there are partitions and is identical to expected list
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != 
null);
-        
assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+        
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        
assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
 
         // on restore, subscribedPartitionsToStartOffsets should be identical 
to the restored state
-        assertEquals(PARTITION_STATE, 
consumerFunction.getSubscribedPartitionsToStartOffsets());
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+                .isEqualTo(PARTITION_STATE);

Review Comment:
   All these lines could be joined in something like these
   ```java
           assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
                   .isNotEmpty()
                   // on restore, subscribedPartitionsToStartOffsets should be 
identical to the restored state
                   .isEqualTo(PARTITION_STATE);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -82,14 +82,16 @@ public void testKeyValueDeserializersSetIfMissing() throws 
Exception {
         new DummyFlinkKafkaProducer<>(
                 props, new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), null);
 
-        
assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-        
assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
-        assertTrue(
-                props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                        .equals(ByteArraySerializer.class.getName()));
-        assertTrue(
-                props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                        .equals(ByteArraySerializer.class.getName()));
+        
assertThat(props).containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+        
assertThat(props).containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+        assertThat(
+                        
props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+                                .equals(ByteArraySerializer.class.getName()))
+                .isTrue();
+        assertThat(
+                        
props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+                                .equals(ByteArraySerializer.class.getName()))
+                .isTrue();

Review Comment:
   Is it expected to have these lines twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to