This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1b02f02 [FLINK-24857][test][FileSource][Kafka] Upgrade SourceReaderTestBase to JUnit 5 1b02f02 is described below commit 1b02f022cdcb4af449ed5dfe88acd3f9e4926dbe Author: Yufei Zhang <affei...@gmail.com> AuthorDate: Mon Dec 6 12:11:50 2021 +0800 [FLINK-24857][test][FileSource][Kafka] Upgrade SourceReaderTestBase to JUnit 5 --- .../base/source/reader/SourceReaderBaseTest.java | 126 +++++++++++---------- .../kafka/source/reader/KafkaSourceReaderTest.java | 82 +++++++------- .../flink-connector-test-utils/pom.xml | 6 + .../source/reader/SourceReaderTestBase.java | 75 ++++++------ 4 files changed, 147 insertions(+), 142 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 07353c2..4a5544d 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -38,9 +38,7 @@ import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; import org.apache.flink.core.io.InputStatus; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; @@ -50,75 +48,81 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** A unit test class for {@link SourceReaderBase}. */ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> { - @Rule public ExpectedException expectedException = ExpectedException.none(); - @Test - public void testExceptionInSplitReader() throws Exception { - expectedException.expect(RuntimeException.class); - expectedException.expectMessage("One or more fetchers have encountered exception"); - final String errMsg = "Testing Exception"; - - FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = - new FutureCompletingBlockingQueue<>(); - // We have to handle split changes first, otherwise fetch will not be called. - try (MockSourceReader reader = - new MockSourceReader( - elementsQueue, - () -> - new SplitReader<int[], MockSourceSplit>() { - @Override - public RecordsWithSplitIds<int[]> fetch() { - throw new RuntimeException(errMsg); - } - - @Override - public void handleSplitsChanges( - SplitsChange<MockSourceSplit> splitsChanges) {} - - @Override - public void wakeUp() {} - - @Override - public void close() {} - }, - getConfig(), - new TestingReaderContext())) { - ValidatingSourceOutput output = new ValidatingSourceOutput(); - reader.addSplits( - Collections.singletonList( - getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED))); - reader.notifyNoMoreSplits(); - // This is not a real infinite loop, it is supposed to throw exception after two polls. - while (true) { - InputStatus inputStatus = reader.pollNext(output); - assertNotEquals(InputStatus.END_OF_INPUT, inputStatus); - // Add a sleep to avoid tight loop. - Thread.sleep(1); - } - } + void testExceptionInSplitReader() { + assertThatThrownBy( + () -> { + final String errMsg = "Testing Exception"; + + FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> + elementsQueue = new FutureCompletingBlockingQueue<>(); + // We have to handle split changes first, otherwise fetch will not be + // called. + try (MockSourceReader reader = + new MockSourceReader( + elementsQueue, + () -> + new SplitReader<int[], MockSourceSplit>() { + @Override + public RecordsWithSplitIds<int[]> fetch() { + throw new RuntimeException(errMsg); + } + + @Override + public void handleSplitsChanges( + SplitsChange<MockSourceSplit> + splitsChanges) {} + + @Override + public void wakeUp() {} + + @Override + public void close() {} + }, + getConfig(), + new TestingReaderContext())) { + ValidatingSourceOutput output = new ValidatingSourceOutput(); + reader.addSplits( + Collections.singletonList( + getSplit( + 0, + NUM_RECORDS_PER_SPLIT, + Boundedness.CONTINUOUS_UNBOUNDED))); + reader.notifyNoMoreSplits(); + // This is not a real infinite loop, it is supposed to throw + // exception after + // two polls. + while (true) { + InputStatus inputStatus = reader.pollNext(output); + assertThat(inputStatus).isNotEqualTo(InputStatus.END_OF_INPUT); + // Add a sleep to avoid tight loop. + Thread.sleep(1); + } + } + }) + .isInstanceOf(RuntimeException.class) + .hasMessage("One or more fetchers have encountered exception"); } @Test - public void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception { + void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception { final TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2"); final SourceReader<?, ?> reader = createReaderAndAwaitAvailable("test-split", records); reader.pollNext(new TestingReaderOutput<>()); - assertFalse(records.isRecycled()); + assertThat(records.isRecycled()).isFalse(); } @Test - public void testRecordsWithSplitsRecycledWhenEmpty() throws Exception { + void testRecordsWithSplitsRecycledWhenEmpty() throws Exception { final TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2"); final SourceReader<?, ?> reader = createReaderAndAwaitAvailable("test-split", records); @@ -129,11 +133,11 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> reader.pollNext(new TestingReaderOutput<>()); reader.pollNext(new TestingReaderOutput<>()); - assertTrue(records.isRecycled()); + assertThat(records.isRecycled()).isTrue(); } @Test - public void testMultipleSplitsWithDifferentFinishingMoments() throws Exception { + void testMultipleSplitsWithDifferentFinishingMoments() throws Exception { FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = new FutureCompletingBlockingQueue<>(); MockSplitReader mockSplitReader = @@ -169,7 +173,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> } @Test - public void testMultipleSplitsWithSeparatedFinishedRecord() throws Exception { + void testMultipleSplitsWithSeparatedFinishedRecord() throws Exception { FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = new FutureCompletingBlockingQueue<>(); MockSplitReader mockSplitReader = @@ -205,7 +209,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> } @Test - public void testPollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue() + void testPollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue() throws Exception { FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = @@ -231,8 +235,8 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> // Add the last record to the split when the splitFetcherManager shutting down SplitFetchers splitFetcherManager.getInShutdownSplitFetcherFuture().thenRun(() -> split.addRecord(1)); - assertEquals( - InputStatus.MORE_AVAILABLE, sourceReader.pollNext(new TestingReaderOutput<>())); + assertThat(sourceReader.pollNext(new TestingReaderOutput<>())) + .isEqualTo(InputStatus.MORE_AVAILABLE); } // ---------------- helper methods ----------------- diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java index e671520..d5c9abb 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java @@ -52,9 +52,9 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.ArrayList; @@ -80,14 +80,13 @@ import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderM import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.TOPIC_GROUP; import static org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv.NUM_PARTITIONS; import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for {@link KafkaSourceReader}. */ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSplit> { private static final String TOPIC = "KafkaSourceReaderTest"; - @BeforeClass + @BeforeAll public static void setup() throws Throwable { KafkaSourceTestEnv.setup(); try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) { @@ -114,7 +113,7 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp getRecords(), StringSerializer.class, IntegerSerializer.class); } - @AfterClass + @AfterAll public static void tearDown() throws Exception { KafkaSourceTestEnv.tearDown(); } @@ -126,7 +125,7 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp // ----------------------------------------- @Test - public void testCommitOffsetsWithoutAliveFetchers() throws Exception { + void testCommitOffsetsWithoutAliveFetchers() throws Exception { final String groupId = "testCommitOffsetsWithoutAliveFetchers"; try (KafkaSourceReader<Integer> reader = (KafkaSourceReader<Integer>) @@ -169,15 +168,15 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp .listConsumerGroupOffsets(groupId) .partitionsToOffsetAndMetadata() .get(); - assertEquals(1, committedOffsets.size()); - committedOffsets.forEach( - (tp, offsetAndMetadata) -> - assertEquals(NUM_RECORDS_PER_SPLIT, offsetAndMetadata.offset())); + assertThat(committedOffsets).hasSize(1); + assertThat(committedOffsets.values()) + .extracting(OffsetAndMetadata::offset) + .allMatch(offset -> offset == NUM_RECORDS_PER_SPLIT); } } @Test - public void testCommitEmptyOffsets() throws Exception { + void testCommitEmptyOffsets() throws Exception { final String groupId = "testCommitEmptyOffsets"; try (KafkaSourceReader<Integer> reader = (KafkaSourceReader<Integer>) @@ -192,12 +191,12 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp .listConsumerGroupOffsets(groupId) .partitionsToOffsetAndMetadata() .get(); - assertTrue(committedOffsets.isEmpty()); + assertThat(committedOffsets).isEmpty(); } } @Test - public void testOffsetCommitOnCheckpointComplete() throws Exception { + void testOffsetCommitOnCheckpointComplete() throws Exception { final String groupId = "testOffsetCommitOnCheckpointComplete"; try (KafkaSourceReader<Integer> reader = (KafkaSourceReader<Integer>) @@ -214,7 +213,7 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp } while (output.count() < totalNumRecords); // The completion of the last checkpoint should subsume all the previous checkpoitns. - assertEquals(checkpointId, reader.getOffsetsToCommit().size()); + assertThat(reader.getOffsetsToCommit()).hasSize((int) checkpointId); long lastCheckpointId = checkpointId; waitUtil( @@ -240,15 +239,15 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp .listConsumerGroupOffsets(groupId) .partitionsToOffsetAndMetadata() .get(); - assertEquals(numSplits, committedOffsets.size()); - committedOffsets.forEach( - (tp, offsetAndMetadata) -> - assertEquals(NUM_RECORDS_PER_SPLIT, offsetAndMetadata.offset())); + assertThat(committedOffsets).hasSize(numSplits); + assertThat(committedOffsets.values()) + .extracting(OffsetAndMetadata::offset) + .allMatch(offset -> offset == NUM_RECORDS_PER_SPLIT); } } @Test - public void testNotCommitOffsetsForUninitializedSplits() throws Exception { + void testNotCommitOffsetsForUninitializedSplits() throws Exception { final long checkpointId = 1234L; try (KafkaSourceReader<Integer> reader = (KafkaSourceReader<Integer>) createReader()) { KafkaPartitionSplit split = @@ -256,13 +255,13 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp new TopicPartition(TOPIC, 0), KafkaPartitionSplit.EARLIEST_OFFSET); reader.addSplits(Collections.singletonList(split)); reader.snapshotState(checkpointId); - assertEquals(1, reader.getOffsetsToCommit().size()); - assertTrue(reader.getOffsetsToCommit().get(checkpointId).isEmpty()); + assertThat(reader.getOffsetsToCommit()).hasSize(1); + assertThat(reader.getOffsetsToCommit().get(checkpointId)).isEmpty(); } } @Test - public void testDisableOffsetCommit() throws Exception { + void testDisableOffsetCommit() throws Exception { final Properties properties = new Properties(); properties.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false"); try (KafkaSourceReader<Integer> reader = @@ -282,13 +281,13 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp // Create a checkpoint for each message consumption, but not complete them. reader.snapshotState(checkpointId); // Offsets to commit should be always empty because offset commit is disabled - assertEquals(0, reader.getOffsetsToCommit().size()); + assertThat(reader.getOffsetsToCommit()).isEmpty(); } while (output.count() < totalNumRecords); } } @Test - public void testKafkaSourceMetrics() throws Exception { + void testKafkaSourceMetrics() throws Exception { final MetricListener metricListener = new MetricListener(); final String groupId = "testKafkaSourceMetrics"; final TopicPartition tp0 = new TopicPartition(TOPIC, 0); @@ -316,17 +315,18 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp "Failed to poll %d records until timeout", NUM_RECORDS_PER_SPLIT * 2)); // Metric "records-consumed-total" of KafkaConsumer should be NUM_RECORDS_PER_SPLIT - assertEquals( - NUM_RECORDS_PER_SPLIT * 2, - getKafkaConsumerMetric("records-consumed-total", metricListener)); + assertThat(getKafkaConsumerMetric("records-consumed-total", metricListener)) + .isEqualTo(NUM_RECORDS_PER_SPLIT * 2); // Current consuming offset should be NUM_RECORD_PER_SPLIT - 1 - assertEquals(NUM_RECORDS_PER_SPLIT - 1, getCurrentOffsetMetric(tp0, metricListener)); - assertEquals(NUM_RECORDS_PER_SPLIT - 1, getCurrentOffsetMetric(tp1, metricListener)); + assertThat(getCurrentOffsetMetric(tp0, metricListener)) + .isEqualTo(NUM_RECORDS_PER_SPLIT - 1); + assertThat(getCurrentOffsetMetric(tp1, metricListener)) + .isEqualTo(NUM_RECORDS_PER_SPLIT - 1); // No offset is committed till now - assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp0, metricListener)); - assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp1, metricListener)); + assertThat(getCommittedOffsetMetric(tp0, metricListener)).isEqualTo(INITIAL_OFFSET); + assertThat(getCommittedOffsetMetric(tp1, metricListener)).isEqualTo(INITIAL_OFFSET); // Trigger offset commit final long checkpointId = 15213L; @@ -354,20 +354,22 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp Matchers.greaterThan(0L)); // Committed offset should be NUM_RECORD_PER_SPLIT - assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp0, metricListener)); - assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp1, metricListener)); + assertThat(getCommittedOffsetMetric(tp0, metricListener)) + .isEqualTo(NUM_RECORDS_PER_SPLIT); + assertThat(getCommittedOffsetMetric(tp1, metricListener)) + .isEqualTo(NUM_RECORDS_PER_SPLIT); // Number of successful commits should be greater than 0 final Optional<Counter> commitsSucceeded = metricListener.getCounter( KAFKA_SOURCE_READER_METRIC_GROUP, COMMITS_SUCCEEDED_METRIC_COUNTER); - assertTrue(commitsSucceeded.isPresent()); + assertThat(commitsSucceeded).isPresent(); MatcherAssert.assertThat(commitsSucceeded.get().getCount(), Matchers.greaterThan(0L)); } } @Test - public void testAssigningEmptySplits() throws Exception { + void testAssigningEmptySplits() throws Exception { // Normal split with NUM_RECORDS_PER_SPLIT records final KafkaPartitionSplit normalSplit = new KafkaPartitionSplit( @@ -511,7 +513,7 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp final Optional<Gauge<Object>> kafkaConsumerGauge = listener.getGauge( KAFKA_SOURCE_READER_METRIC_GROUP, KAFKA_CONSUMER_METRIC_GROUP, name); - assertTrue(kafkaConsumerGauge.isPresent()); + assertThat(kafkaConsumerGauge).isPresent(); return ((Double) kafkaConsumerGauge.get().getValue()).longValue(); } @@ -524,7 +526,7 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp PARTITION_GROUP, String.valueOf(tp.partition()), CURRENT_OFFSET_METRIC_GAUGE); - assertTrue(currentOffsetGauge.isPresent()); + assertThat(currentOffsetGauge).isPresent(); return (long) currentOffsetGauge.get().getValue(); } @@ -537,7 +539,7 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp PARTITION_GROUP, String.valueOf(tp.partition()), COMMITTED_OFFSET_METRIC_GAUGE); - assertTrue(committedOffsetGauge.isPresent()); + assertThat(committedOffsetGauge).isPresent(); return (long) committedOffsetGauge.get().getValue(); } diff --git a/flink-test-utils-parent/flink-connector-test-utils/pom.xml b/flink-test-utils-parent/flink-connector-test-utils/pom.xml index 89e0011..842b01d 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/pom.xml +++ b/flink-test-utils-parent/flink-connector-test-utils/pom.xml @@ -55,6 +55,12 @@ </dependency> <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> <scope>compile</scope> diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase.java index 462c5b2..6a83c5e 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/SourceReaderTestBase.java @@ -27,10 +27,9 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.core.io.InputStatus; import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.util.ArrayList; import java.util.Collections; @@ -39,8 +38,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.assertj.core.api.Assertions.assertThat; /** * An abstract test class for all the unit tests of {@link SourceReader} to inherit. @@ -48,7 +46,6 @@ import static org.junit.Assert.assertFalse; * @param <SplitT> the type of the splits. */ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends TestLogger { - protected final int numSplits; protected final int totalNumRecords; protected static final int NUM_RECORDS_PER_SPLIT = 10; @@ -62,9 +59,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T return 10; } - @Rule public ExpectedException expectedException = ExpectedException.none(); - - @After + @AfterEach public void ensureNoDangling() { for (Thread t : Thread.getAllStackTraces().keySet()) { if (t.getName().equals("SourceFetcher")) { @@ -75,7 +70,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T /** Simply test the reader reads all the splits fine. */ @Test - public void testRead() throws Exception { + void testRead() throws Exception { try (SourceReader<Integer, SplitT> reader = createReader()) { reader.addSplits(getSplits(numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)); ValidatingSourceOutput output = new ValidatingSourceOutput(); @@ -87,7 +82,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T } @Test - public void testAddSplitToExistingFetcher() throws Exception { + void testAddSplitToExistingFetcher() throws Exception { Thread.sleep(10); ValidatingSourceOutput output = new ValidatingSourceOutput(); // Add a split to start the fetcher. @@ -108,8 +103,9 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T } } - @Test(timeout = 30000L) - public void testPollingFromEmptyQueue() throws Exception { + @Test + @Timeout(30) + void testPollingFromEmptyQueue() throws Exception { ValidatingSourceOutput output = new ValidatingSourceOutput(); List<SplitT> splits = Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)); @@ -117,19 +113,19 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_RECORDS_PER_SPLIT)) { // Now let the main thread poll again. - assertEquals( - "The status should be ", - InputStatus.NOTHING_AVAILABLE, - reader.pollNext(output)); + assertThat(reader.pollNext(output)) + .as("The status should be %s", InputStatus.NOTHING_AVAILABLE) + .isEqualTo(InputStatus.NOTHING_AVAILABLE); } } - @Test(timeout = 30000L) - public void testAvailableOnEmptyQueue() throws Exception { + @Test + @Timeout(30) + void testAvailableOnEmptyQueue() throws Exception { // Consumer all the records in the split. try (SourceReader<Integer, SplitT> reader = createReader()) { CompletableFuture<?> future = reader.isAvailable(); - assertFalse("There should be no records ready for poll.", future.isDone()); + assertThat(future.isDone()).as("There should be no records ready for poll.").isFalse(); // Add a split to the reader so there are more records to be read. reader.addSplits( Collections.singletonList( @@ -140,8 +136,9 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T } } - @Test(timeout = 30000L) - public void testSnapshot() throws Exception { + @Test + @Timeout(30) + void testSnapshot() throws Exception { ValidatingSourceOutput output = new ValidatingSourceOutput(); // Add a split to start the fetcher. List<SplitT> splits = @@ -149,12 +146,11 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, totalNumRecords)) { List<SplitT> state = reader.snapshotState(1L); - assertEquals("The snapshot should only have 10 splits. ", numSplits, state.size()); + assertThat(state).as("The snapshot should only have 10 splits. ").hasSize(numSplits); for (int i = 0; i < numSplits; i++) { - assertEquals( - "The first four splits should have been fully consumed.", - NUM_RECORDS_PER_SPLIT, - getNextRecordIndex(state.get(i))); + assertThat(getNextRecordIndex(state.get(i))) + .as("The first four splits should have been fully consumed.") + .isEqualTo(NUM_RECORDS_PER_SPLIT); } } } @@ -207,19 +203,16 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T public void validate() { - assertEquals( - String.format("Should be %d distinct elements in total", totalNumRecords), - totalNumRecords, - consumedValues.size()); - assertEquals( - String.format("Should be %d elements in total", totalNumRecords), - totalNumRecords, - count); - assertEquals("The min value should be 0", 0, min); - assertEquals( - String.format("The max value should be %d", totalNumRecords - 1), - totalNumRecords - 1, - max); + assertThat(consumedValues) + .as("Should be %d distinct elements in total", totalNumRecords) + .hasSize(totalNumRecords); + assertThat(count) + .as("Should be %d elements in total", totalNumRecords) + .isEqualTo(totalNumRecords); + assertThat(min).as("The min value should be 0", totalNumRecords).isZero(); + assertThat(max) + .as("The max value should be %d", totalNumRecords - 1) + .isEqualTo(totalNumRecords - 1); } public int count() {