afedulov commented on code in PR #19660:
URL: https://github.com/apache/flink/pull/19660#discussion_r876439486


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -247,12 +249,18 @@ public void 
testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints() throw
                 DeliveryGuarantee.EXACTLY_ONCE,
                 2,
                 (records) ->
-                        assertThat(
-                                records,
-                                contains(
-                                        LongStream.range(1, 
lastCheckpointedRecord.get().get() + 1)
-                                                .boxed()
-                                                .toArray())));
+                        assertThat(records)
+                                .satisfies(

Review Comment:
   Use contains directly?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -286,12 +293,13 @@ public void 
testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
                 new FailingCheckpointMapper(failed, lastCheckpointedRecord), 
config, "newPrefix");
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
                 drainAllRecordsFromTopic(topic, true);
-        assertThat(
-                deserializeValues(collectedRecords),
-                contains(
-                        LongStream.range(1, lastCheckpointedRecord.get().get() 
+ 1)
-                                .boxed()
-                                .toArray()));
+        assertThat(deserializeValues(collectedRecords))
+                .satisfies(

Review Comment:
   Use contains directly?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -165,13 +167,13 @@ public void testAsyncErrorRethrownOnInvoke() throws 
Throwable {
             testHarness.processElement(new StreamRecord<>("msg-2"));
         } catch (Exception e) {
             // the next invoke should rethrow the async exception
-            Assert.assertTrue(e.getCause().getMessage().contains("artificial 
async exception"));
+            assertThat(e.getCause().getMessage()).contains("artificial async 
exception");

Review Comment:
   assertThatThrownBy?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java:
##########
@@ -679,10 +666,9 @@ private void testFailingConsumerLifecycle(
                     "Exception should have been thrown from open / run method 
of FlinkKafkaConsumerBase.");
         } catch (Exception e) {
             assertThat(
-                    ExceptionUtils.findThrowable(
-                                    e, throwable -> 
throwable.equals(expectedException))
-                            .isPresent(),
-                    is(true));
+                            ExceptionUtils.findThrowable(

Review Comment:
   assertThatThrownBy?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -227,68 +225,4 @@ public TypeInformation<String> getProducedType() {
             return Types.STRING;
         }
     }
-

Review Comment:
   Why are those not needed anymore?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java:
##########
@@ -395,11 +393,14 @@ void testAssigningEmptySplits() throws Exception {
                     new TestingReaderOutput<>(),
                     () -> reader.getNumAliveFetchers() == 0,
                     "The split fetcher did not exit before timeout.");
-            MatcherAssert.assertThat(
-                    finishedSplits,
-                    Matchers.containsInAnyOrder(
-                            
KafkaPartitionSplit.toSplitId(normalSplit.getTopicPartition()),
-                            
KafkaPartitionSplit.toSplitId(emptySplit.getTopicPartition())));
+            assertThat(finishedSplits)
+                    .satisfies(

Review Comment:
   Use assertj contains* directly?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -408,10 +416,11 @@ private void writeRecordsToKafka(
                 drainAllRecordsFromTopic(
                         topic, deliveryGuarantee == 
DeliveryGuarantee.EXACTLY_ONCE);
         final long recordsCount = expectedRecords.get().get();
-        assertEquals(collectedRecords.size(), recordsCount);
-        assertThat(
-                deserializeValues(collectedRecords),
-                contains(LongStream.range(1, recordsCount + 
1).boxed().toArray()));
+        assertThat(recordsCount).isEqualTo(collectedRecords.size());

Review Comment:
   Use contains directly?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -315,12 +322,13 @@ public void 
testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exce
                 new FailingCheckpointMapper(failed, lastCheckpointedRecord), 
config, null);
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
                 drainAllRecordsFromTopic(topic, true);
-        assertThat(
-                deserializeValues(collectedRecords),
-                contains(
-                        LongStream.range(1, lastCheckpointedRecord.get().get() 
+ 1)
-                                .boxed()
-                                .toArray()));
+        assertThat(deserializeValues(collectedRecords))
+                .satisfies(

Review Comment:
   Use contains directly?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -233,12 +229,18 @@ public void testRecoveryWithExactlyOnceGuarantee() throws 
Exception {
                 DeliveryGuarantee.EXACTLY_ONCE,
                 1,
                 (records) ->
-                        assertThat(
-                                records,
-                                contains(
-                                        LongStream.range(1, 
lastCheckpointedRecord.get().get() + 1)
-                                                .boxed()
-                                                .toArray())));
+                        assertThat(records)
+                                .satisfies(

Review Comment:
   Use contains directly?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java:
##########
@@ -77,21 +78,26 @@ public void testGetTransactions() {
         final KafkaTransactionLog transactionLog =
                 new KafkaTransactionLog(getKafkaClientConfiguration());
         final List<TransactionRecord> transactions = 
transactionLog.getTransactions();
-        assertThat(
-                transactions,
-                containsInAnyOrder(
-                        new TransactionRecord(buildTransactionalId(1), Empty),
-                        new TransactionRecord(buildTransactionalId(1), 
Ongoing),
-                        new TransactionRecord(buildTransactionalId(1), 
PrepareCommit),
-                        new TransactionRecord(buildTransactionalId(1), 
CompleteCommit),
-                        new TransactionRecord(buildTransactionalId(2), Empty),
-                        new TransactionRecord(buildTransactionalId(2), 
Ongoing),
-                        new TransactionRecord(buildTransactionalId(2), 
PrepareAbort),
-                        new TransactionRecord(buildTransactionalId(2), 
CompleteAbort),
-                        new TransactionRecord(buildTransactionalId(3), Empty),
-                        new TransactionRecord(buildTransactionalId(3), 
Ongoing),
-                        new TransactionRecord(buildTransactionalId(4), Empty),
-                        new TransactionRecord(buildTransactionalId(4), 
Ongoing)));
+        assertThat(transactions)
+                .satisfies(

Review Comment:
   Use assertj contains* directly?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -202,13 +204,13 @@ public void testAsyncErrorRethrownOnCheckpoint() throws 
Throwable {
             testHarness.snapshot(123L, 123L);
         } catch (Exception e) {
             // the next invoke should rethrow the async exception
-            Assert.assertTrue(e.getCause().getMessage().contains("artificial 
async exception"));
+            assertThat(e.getCause().getMessage()).contains("artificial async 
exception");

Review Comment:
   assertThatThrownBy?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -266,14 +268,14 @@ public void go() throws Exception {
             snapshotThread.sync();
         } catch (Exception e) {
             // the snapshot should have failed with the async exception
-            Assert.assertTrue(
-                    e.getCause().getMessage().contains("artificial async 
failure for 2nd message"));
+            assertThat(e.getCause().getMessage())

Review Comment:
   assertThatThrownBy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to