afedulov commented on code in PR #19660: URL: https://github.com/apache/flink/pull/19660#discussion_r876439486
########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java: ########## @@ -247,12 +249,18 @@ public void testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints() throw DeliveryGuarantee.EXACTLY_ONCE, 2, (records) -> - assertThat( - records, - contains( - LongStream.range(1, lastCheckpointedRecord.get().get() + 1) - .boxed() - .toArray()))); + assertThat(records) + .satisfies( Review Comment: Use contains directly? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java: ########## @@ -286,12 +293,13 @@ public void testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, "newPrefix"); final List<ConsumerRecord<byte[], byte[]>> collectedRecords = drainAllRecordsFromTopic(topic, true); - assertThat( - deserializeValues(collectedRecords), - contains( - LongStream.range(1, lastCheckpointedRecord.get().get() + 1) - .boxed() - .toArray())); + assertThat(deserializeValues(collectedRecords)) + .satisfies( Review Comment: Use contains directly? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java: ########## @@ -165,13 +167,13 @@ public void testAsyncErrorRethrownOnInvoke() throws Throwable { testHarness.processElement(new StreamRecord<>("msg-2")); } catch (Exception e) { // the next invoke should rethrow the async exception - Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception")); + assertThat(e.getCause().getMessage()).contains("artificial async exception"); Review Comment: assertThatThrownBy? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java: ########## @@ -679,10 +666,9 @@ private void testFailingConsumerLifecycle( "Exception should have been thrown from open / run method of FlinkKafkaConsumerBase."); } catch (Exception e) { assertThat( - ExceptionUtils.findThrowable( - e, throwable -> throwable.equals(expectedException)) - .isPresent(), - is(true)); + ExceptionUtils.findThrowable( Review Comment: assertThatThrownBy? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java: ########## @@ -227,68 +225,4 @@ public TypeInformation<String> getProducedType() { return Types.STRING; } } - Review Comment: Why are those not needed anymore? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java: ########## @@ -395,11 +393,14 @@ void testAssigningEmptySplits() throws Exception { new TestingReaderOutput<>(), () -> reader.getNumAliveFetchers() == 0, "The split fetcher did not exit before timeout."); - MatcherAssert.assertThat( - finishedSplits, - Matchers.containsInAnyOrder( - KafkaPartitionSplit.toSplitId(normalSplit.getTopicPartition()), - KafkaPartitionSplit.toSplitId(emptySplit.getTopicPartition()))); + assertThat(finishedSplits) + .satisfies( Review Comment: Use assertj contains* directly? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java: ########## @@ -408,10 +416,11 @@ private void writeRecordsToKafka( drainAllRecordsFromTopic( topic, deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE); final long recordsCount = expectedRecords.get().get(); - assertEquals(collectedRecords.size(), recordsCount); - assertThat( - deserializeValues(collectedRecords), - contains(LongStream.range(1, recordsCount + 1).boxed().toArray())); + assertThat(recordsCount).isEqualTo(collectedRecords.size()); Review Comment: Use contains directly? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java: ########## @@ -315,12 +322,13 @@ public void testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exce new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, null); final List<ConsumerRecord<byte[], byte[]>> collectedRecords = drainAllRecordsFromTopic(topic, true); - assertThat( - deserializeValues(collectedRecords), - contains( - LongStream.range(1, lastCheckpointedRecord.get().get() + 1) - .boxed() - .toArray())); + assertThat(deserializeValues(collectedRecords)) + .satisfies( Review Comment: Use contains directly? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java: ########## @@ -233,12 +229,18 @@ public void testRecoveryWithExactlyOnceGuarantee() throws Exception { DeliveryGuarantee.EXACTLY_ONCE, 1, (records) -> - assertThat( - records, - contains( - LongStream.range(1, lastCheckpointedRecord.get().get() + 1) - .boxed() - .toArray()))); + assertThat(records) + .satisfies( Review Comment: Use contains directly? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java: ########## @@ -77,21 +78,26 @@ public void testGetTransactions() { final KafkaTransactionLog transactionLog = new KafkaTransactionLog(getKafkaClientConfiguration()); final List<TransactionRecord> transactions = transactionLog.getTransactions(); - assertThat( - transactions, - containsInAnyOrder( - new TransactionRecord(buildTransactionalId(1), Empty), - new TransactionRecord(buildTransactionalId(1), Ongoing), - new TransactionRecord(buildTransactionalId(1), PrepareCommit), - new TransactionRecord(buildTransactionalId(1), CompleteCommit), - new TransactionRecord(buildTransactionalId(2), Empty), - new TransactionRecord(buildTransactionalId(2), Ongoing), - new TransactionRecord(buildTransactionalId(2), PrepareAbort), - new TransactionRecord(buildTransactionalId(2), CompleteAbort), - new TransactionRecord(buildTransactionalId(3), Empty), - new TransactionRecord(buildTransactionalId(3), Ongoing), - new TransactionRecord(buildTransactionalId(4), Empty), - new TransactionRecord(buildTransactionalId(4), Ongoing))); + assertThat(transactions) + .satisfies( Review Comment: Use assertj contains* directly? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java: ########## @@ -202,13 +204,13 @@ public void testAsyncErrorRethrownOnCheckpoint() throws Throwable { testHarness.snapshot(123L, 123L); } catch (Exception e) { // the next invoke should rethrow the async exception - Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception")); + assertThat(e.getCause().getMessage()).contains("artificial async exception"); Review Comment: assertThatThrownBy? ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java: ########## @@ -266,14 +268,14 @@ public void go() throws Exception { snapshotThread.sync(); } catch (Exception e) { // the snapshot should have failed with the async exception - Assert.assertTrue( - e.getCause().getMessage().contains("artificial async failure for 2nd message")); + assertThat(e.getCause().getMessage()) Review Comment: assertThatThrownBy? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org