afedulov commented on code in PR #19660: URL: https://github.com/apache/flink/pull/19660#discussion_r876452544
########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java: ########## @@ -1137,8 +1138,9 @@ public void emitWatermark(Watermark mark) { expectedResults.add(new Watermark(-4)); // verify watermark awaitRecordCount(results, expectedResults.size()); - assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); - assertEquals(0, TestWatermarkTracker.WATERMARK.get()); + assertThat(results) + .satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray()))); Review Comment: Use native assertj contains? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java: ########## @@ -1213,7 +1216,8 @@ private static class OpenCheckingStringSchema extends SimpleStringSchema { @Override public void open(DeserializationSchema.InitializationContext context) throws Exception { - assertThat(context.getMetricGroup(), notNullValue(MetricGroup.class)); + assertThat(context.getMetricGroup()) + .satisfies(matching(notNullValue(MetricGroup.class))); Review Comment: Is there anything that prevents a direct assertj not null check? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java: ########## @@ -984,9 +981,13 @@ public void markAsTemporarilyIdle() {} sourceFunc.cancel(); testHarness.close(); - assertEquals("record count", recordCount, testHarness.getOutput().size()); - assertThat(watermarks, org.hamcrest.Matchers.contains(new Watermark(-3), new Watermark(5))); - assertEquals("watermark count", watermarkCount, watermarks.size()); + assertThat(testHarness.getOutput()).as("record count").hasSize(recordCount); + assertThat(watermarks) + .satisfies( + matching( Review Comment: Use native assertj contains? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java: ########## @@ -175,15 +171,14 @@ public void testAsyncErrorRethrownOnCheckpoint() throws Throwable { testHarness.snapshot(123L, 123L); } catch (Exception e) { // the next checkpoint should rethrow the async exception - Assert.assertTrue( - ExceptionUtils.findThrowableWithMessage(e, "artificial async exception") - .isPresent()); + assertThat(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception")) Review Comment: assertThatThrownBy? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java: ########## @@ -210,11 +208,12 @@ public void testGetShardList() throws Exception { expectedStreamShard.add(shardHandle); } - Assert.assertThat( - actualShardList, - containsInAnyOrder( - expectedStreamShard.toArray( - new StreamShardHandle[actualShardList.size()]))); + assertThat(actualShardList) + .satisfies( Review Comment: Use native assertj checks? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java: ########## @@ -120,15 +122,17 @@ public void testRetainMinAfterReachingLimit() throws Exception { while (emitter.results.size() != 4 && dl.hasTimeLeft()) { Thread.sleep(10); } - Assert.assertThat(emitter.results, Matchers.contains(one, two, three, ten)); + assertThat(emitter.results) + .satisfies(matching(Matchers.contains(one, two, three, ten))); // advance watermark, emits remaining record from queue0 emitter.setCurrentWatermark(10); dl = Deadline.fromNow(Duration.ofSeconds(10)); while (emitter.results.size() != 5 && dl.hasTimeLeft()) { Thread.sleep(10); } - Assert.assertThat(emitter.results, Matchers.contains(one, two, three, ten, eleven)); + assertThat(emitter.results) Review Comment: Use native assertj checks? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java: ########## @@ -235,16 +230,16 @@ public void go() throws Exception { snapshotThread.sync(); } catch (Exception e) { // after the flush, the async exception should have been rethrown - Assert.assertTrue( - ExceptionUtils.findThrowableWithMessage( - e, "artificial async failure for 2nd message") - .isPresent()); + assertThat( Review Comment: assertThatThrownBy? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java: ########## @@ -123,14 +123,13 @@ public void testStopWithSavepoint() throws Exception { List<String> result = stream.executeAndCollect(10000); // stop with savepoint will most likely only return a small subset of the elements // validate that the prefix is as expected - assertThat(result, hasSize(lessThan(numElements))); - assertThat( - result, - equalTo( + assertThat(result).satisfies(matching(hasSize(lessThan(numElements)))); Review Comment: Use native assertj checks? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java: ########## @@ -246,15 +245,15 @@ public void testGetShardListWithNewShardsOnSecondRun() throws Exception { GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap); // then - Assert.assertTrue(shardListResult.hasRetrievedShards()); + assertThat(shardListResult.hasRetrievedShards()).isTrue(); Set<String> expectedStreams = new HashSet<>(); expectedStreams.add(fakeStreamName); - Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams); + assertThat(expectedStreams).isEqualTo(shardListResult.getStreamsWithRetrievedShards()); List<StreamShardHandle> actualShardList = shardListResult.getRetrievedShardListOfStream(fakeStreamName); - Assert.assertThat(actualShardList, hasSize(2)); + assertThat(actualShardList).satisfies(matching(hasSize(2))); Review Comment: Use native assertj checks? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java: ########## @@ -1155,40 +1157,42 @@ public void emitWatermark(Watermark mark) { while (deadline.hasTimeLeft() && emitterQueue.getSize() < 1) { Thread.sleep(10); } - assertEquals("first record received", 1, emitterQueue.getSize()); + assertThat(emitterQueue.getSize()).as("first record received").isEqualTo(1); // Advance the watermark. Since the new record is past global watermark + threshold, // it won't be emitted and the watermark does not advance testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); - assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); - assertEquals( - 3000L, - (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark")); + assertThat(results) + .satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray()))); + assertThat((long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark")) + .isEqualTo(3000L); TestWatermarkTracker.assertGlobalWatermark(-4); // Trigger global watermark sync testHarness.setProcessingTime(testHarness.getProcessingTime() + 1); expectedResults.add(Long.toString(record2)); awaitRecordCount(results, expectedResults.size()); - assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); + assertThat(results) + .satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray()))); TestWatermarkTracker.assertGlobalWatermark(3000); // Trigger watermark update and emit testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); expectedResults.add(new Watermark(3000)); - assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); + assertThat(results) + .satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray()))); Review Comment: Use native assertj contains? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java: ########## @@ -139,15 +136,14 @@ public void testAsyncErrorRethrownOnInvoke() throws Throwable { testHarness.processElement(new StreamRecord<>("msg-2")); } catch (Exception e) { // the next invoke should rethrow the async exception - Assert.assertTrue( - ExceptionUtils.findThrowableWithMessage(e, "artificial async exception") - .isPresent()); + assertThat(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception")) Review Comment: assertThatThrownBy? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java: ########## @@ -291,12 +291,12 @@ public void testGetShardListWithNewShardsOnSecondRun() throws Exception { GetShardListResult newShardListResult = kinesisProxy.getShardList(streamHashMap); // then new shards - Assert.assertTrue(newShardListResult.hasRetrievedShards()); - Assert.assertEquals(newShardListResult.getStreamsWithRetrievedShards(), expectedStreams); + assertThat(newShardListResult.hasRetrievedShards()).isTrue(); + assertThat(expectedStreams).isEqualTo(newShardListResult.getStreamsWithRetrievedShards()); List<StreamShardHandle> newActualShardList = newShardListResult.getRetrievedShardListOfStream(fakeStreamName); - Assert.assertThat(newActualShardList, hasSize(1)); + assertThat(newActualShardList).satisfies(matching(hasSize(1))); Review Comment: Use native assertj checks? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java: ########## @@ -1155,40 +1157,42 @@ public void emitWatermark(Watermark mark) { while (deadline.hasTimeLeft() && emitterQueue.getSize() < 1) { Thread.sleep(10); } - assertEquals("first record received", 1, emitterQueue.getSize()); + assertThat(emitterQueue.getSize()).as("first record received").isEqualTo(1); // Advance the watermark. Since the new record is past global watermark + threshold, // it won't be emitted and the watermark does not advance testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); - assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); - assertEquals( - 3000L, - (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark")); + assertThat(results) + .satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray()))); Review Comment: Use native assertj contains? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java: ########## @@ -1030,9 +1019,10 @@ public void go() throws Exception { fail("Expected exception from deserializer, but got: " + e); } - assertTrue( - "Expected Fetcher to have been interrupted. This test didn't accomplish its goal.", - fetcher.wasInterrupted); + assertThat(fetcher.wasInterrupted) Review Comment: Convert the above to assertThatThrownBy? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java: ########## @@ -191,15 +189,15 @@ public void testGetShardList() throws Exception { Arrays.asList(fakeStreamName)); GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap); - Assert.assertEquals(shardListResult.hasRetrievedShards(), true); + assertThat(true).isEqualTo(shardListResult.hasRetrievedShards()); Set<String> expectedStreams = new HashSet<>(); expectedStreams.add(fakeStreamName); - Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams); + assertThat(expectedStreams).isEqualTo(shardListResult.getStreamsWithRetrievedShards()); List<StreamShardHandle> actualShardList = shardListResult.getRetrievedShardListOfStream(fakeStreamName); List<StreamShardHandle> expectedStreamShard = new ArrayList<>(); - assertThat(actualShardList, hasSize(4)); + assertThat(actualShardList).satisfies(matching(hasSize(4))); Review Comment: Use native assertj check? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java: ########## @@ -120,15 +122,17 @@ public void testRetainMinAfterReachingLimit() throws Exception { while (emitter.results.size() != 4 && dl.hasTimeLeft()) { Thread.sleep(10); } - Assert.assertThat(emitter.results, Matchers.contains(one, two, three, ten)); + assertThat(emitter.results) Review Comment: Use native assertj checks? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java: ########## @@ -269,11 +268,12 @@ public void testGetShardListWithNewShardsOnSecondRun() throws Exception { i)))) .collect(Collectors.toList()); - Assert.assertThat( - actualShardList, - containsInAnyOrder( - expectedStreamShard.toArray( - new StreamShardHandle[actualShardList.size()]))); + assertThat(actualShardList) + .satisfies( Review Comment: Use native assertj checks? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java: ########## @@ -81,7 +83,7 @@ public void test() throws Exception { } emitter.stop(); - Assert.assertThat(emitter.results, Matchers.contains(one, five, two, ten)); + assertThat(emitter.results).satisfies(matching(Matchers.contains(one, five, two, ten))); Review Comment: Use native assertj checks? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java: ########## @@ -307,11 +307,13 @@ public void testGetShardListWithNewShardsOnSecondRun() throws Exception { KinesisShardIdGenerator.generateFromShardOrder( 2)))); - Assert.assertThat( - newActualShardList, - containsInAnyOrder( - newExpectedStreamShard.toArray( - new StreamShardHandle[newActualShardList.size()]))); + assertThat(newActualShardList) + .satisfies( Review Comment: Use native assertj checks? ########## flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java: ########## @@ -1155,40 +1157,42 @@ public void emitWatermark(Watermark mark) { while (deadline.hasTimeLeft() && emitterQueue.getSize() < 1) { Thread.sleep(10); } - assertEquals("first record received", 1, emitterQueue.getSize()); + assertThat(emitterQueue.getSize()).as("first record received").isEqualTo(1); // Advance the watermark. Since the new record is past global watermark + threshold, // it won't be emitted and the watermark does not advance testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); - assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); - assertEquals( - 3000L, - (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark")); + assertThat(results) + .satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray()))); + assertThat((long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark")) + .isEqualTo(3000L); TestWatermarkTracker.assertGlobalWatermark(-4); // Trigger global watermark sync testHarness.setProcessingTime(testHarness.getProcessingTime() + 1); expectedResults.add(Long.toString(record2)); awaitRecordCount(results, expectedResults.size()); - assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); + assertThat(results) + .satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray()))); Review Comment: Use native assertj contains? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org