This is an automated email from the ASF dual-hosted git repository.
ferenc-csaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 11fcabf [FLINK-27941] Migrate remaining tests to JUnit5
11fcabf is described below
commit 11fcabfd472ac80aa28f523d67194aeb73fddf49
Author: jlalwani-amazon <[email protected]>
AuthorDate: Wed Jun 3 09:30:19 2026 -0400
[FLINK-27941] Migrate remaining tests to JUnit5
---
.../table/test/KinesisFirehoseTableITTest.java | 35 ++++++++++--------
.../table/test/KinesisStreamsTableApiIT.java | 24 ++++++------
.../connector/sqs/sink/test/SqsSinkITTest.java | 30 ++++++++-------
.../test/GlueSchemaRegistryAvroKinesisITCase.java | 43 +++++++++++-----------
.../json/GlueSchemaRegistryJsonKinesisITCase.java | 43 +++++++++++-----------
.../KinesisDynamicTableSourceFactoryTest.java | 8 ++--
6 files changed, 94 insertions(+), 89 deletions(-)
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
index b86c253..2d0f078 100644
---
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
@@ -25,7 +25,7 @@ import
org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.util.DockerImageVersions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -35,16 +35,18 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.SdkHttpClient;
@@ -74,7 +76,10 @@ import static
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehose
import static
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createFirehoseClient;
/** End to End test for Kinesis Firehose Table sink API. */
-public class KinesisFirehoseTableITTest extends TestLogger {
+@Testcontainers
+@Timeout(value = 10, unit = TimeUnit.MINUTES)
+@ExtendWith(TestLoggerExtension.class)
+public class KinesisFirehoseTableITTest {
private static final Logger LOG =
LoggerFactory.getLogger(KinesisFirehoseTableITTest.class);
@@ -95,9 +100,7 @@ public class KinesisFirehoseTableITTest extends TestLogger {
private static final int NUM_ELEMENTS = 5;
private static final Network network = Network.newNetwork();
- @ClassRule public static final Timeout TIMEOUT = new Timeout(10,
TimeUnit.MINUTES);
-
- @ClassRule
+ @Container
public static LocalstackContainer mockFirehoseContainer =
new
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
.withNetwork(network)
@@ -117,7 +120,7 @@ public class KinesisFirehoseTableITTest extends TestLogger {
public static final FlinkContainers FLINK =
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
- @Before
+ @BeforeEach
public void setup() throws Exception {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
@@ -139,17 +142,17 @@ public class KinesisFirehoseTableITTest extends
TestLogger {
LOG.info("Done setting up the localstack.");
}
- @BeforeClass
+ @BeforeAll
public static void setupFlink() throws Exception {
FLINK.start();
}
- @AfterClass
+ @AfterAll
public static void stopFlink() {
FLINK.stop();
}
- @After
+ @AfterEach
public void teardown() {
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
index 65e7486..ac4b510 100644
---
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
@@ -36,14 +36,13 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.collect.ImmutableList;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.rules.Timeout;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.slf4j.Logger;
@@ -79,6 +78,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** End-to-end test for Kinesis Streams Table API Sink using Kinesalite. */
@Testcontainers
@ExtendWith(MiniClusterExtension.class)
+@Timeout(value = 10, unit = TimeUnit.MINUTES)
public class KinesisStreamsTableApiIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(KinesisStreamsTableApiIT.class);
@@ -96,8 +96,6 @@ public class KinesisStreamsTableApiIT {
ResourceTestUtils.getResource(".*kinesis-streams.jar");
private static final Network network = Network.newNetwork();
- @ClassRule public static final Timeout TIMEOUT = new Timeout(10,
TimeUnit.MINUTES);
-
@Container
public static final LocalstackContainer LOCALSTACK_CONTAINER =
new
LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION))
@@ -123,17 +121,17 @@ public class KinesisStreamsTableApiIT {
public static final FlinkContainers FLINK =
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
- @BeforeClass
+ @BeforeAll
public static void setupFlink() throws Exception {
FLINK.start();
}
- @AfterClass
+ @AfterAll
public static void stopFlink() {
FLINK.stop();
}
- @Before
+ @BeforeEach
public void setUp() throws Exception {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
httpClient = AWSServicesTestUtils.createHttpClient();
@@ -144,7 +142,7 @@ public class KinesisStreamsTableApiIT {
prepareStream(LARGE_ORDERS_STREAM);
}
- @After
+ @AfterEach
public void teardown() {
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
AWSGeneralUtil.closeResources(httpClient, kinesisClient);
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java
index 97a4e16..15dd251 100644
---
a/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java
@@ -28,18 +28,20 @@ import
org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.DockerImageVersions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.SdkHttpClient;
@@ -55,7 +57,9 @@ import static
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createS
import static org.junit.jupiter.api.Assertions.assertEquals;
/** End to End test for SQS sink API. */
-public class SqsSinkITTest extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+public class SqsSinkITTest {
private static final Logger LOG =
LoggerFactory.getLogger(SqsSinkITTest.class);
@@ -65,7 +69,7 @@ public class SqsSinkITTest extends TestLogger {
private SqsClient sqsClient;
private static final Network network = Network.newNetwork();
- @ClassRule
+ @Container
public static LocalstackContainer mockSqsContainer =
new
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
.withNetwork(network)
@@ -85,7 +89,7 @@ public class SqsSinkITTest extends TestLogger {
public static final FlinkContainers FLINK =
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
- @Before
+ @BeforeEach
public void setup() throws Exception {
httpClient = AWSServicesTestUtils.createHttpClient();
sqsClient = createSqsClient(mockSqsContainer.getEndpoint(),
httpClient);
@@ -94,17 +98,17 @@ public class SqsSinkITTest extends TestLogger {
LOG.info("Done setting up the localstack.");
}
- @BeforeClass
+ @BeforeAll
public static void setupFlink() throws Exception {
FLINK.start();
}
- @AfterClass
+ @AfterAll
public static void stopFlink() {
FLINK.stop();
}
- @After
+ @AfterEach
public void teardown() {
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
httpClient.close();
diff --git
a/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java
b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java
index 631fb3e..93c8600 100644
---
a/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java
+++
b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java
@@ -32,21 +32,21 @@ import
org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvro
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.StringUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
-import org.junit.After;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -71,10 +71,13 @@ import static
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROT
import static
org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
import static
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
/** End-to-end test for Glue Schema Registry AVRO format using Localstack. */
-public class GlueSchemaRegistryAvroKinesisITCase extends TestLogger {
- private static final Logger LOGGER =
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+public class GlueSchemaRegistryAvroKinesisITCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(GlueSchemaRegistryAvroKinesisITCase.class);
private static final String INPUT_STREAM = "gsr_avro_input_stream";
@@ -95,21 +98,17 @@ public class GlueSchemaRegistryAvroKinesisITCase extends
TestLogger {
private KinesisClient kinesisClient;
private GSRKinesisPubsubClient gsrKinesisPubsubClient;
- @ClassRule
+ @Container
public static LocalstackContainer mockKinesisContainer =
new
LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION))
.withNetworkAliases("localstack");
- @Before
+ @BeforeEach
public void setup() throws Exception {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
- Assume.assumeTrue(
- "Access key not configured, skipping test...",
- !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY));
- Assume.assumeTrue(
- "Secret key not configured, skipping test...",
- !StringUtils.isNullOrWhitespaceOnly(SECRET_KEY));
+ assumeThat(ACCESS_KEY).as("Access key not configured, skipping
test...").isNotBlank();
+ assumeThat(SECRET_KEY).as("Secret key not configured, skipping
test...").isNotBlank();
StaticCredentialsProvider gsrCredentialsProvider =
StaticCredentialsProvider.create(
@@ -125,10 +124,10 @@ public class GlueSchemaRegistryAvroKinesisITCase extends
TestLogger {
gsrKinesisPubsubClient.prepareStream(INPUT_STREAM);
gsrKinesisPubsubClient.prepareStream(OUTPUT_STREAM);
- LOGGER.info("Done setting up the localstack.");
+ LOG.info("Done setting up the localstack.");
}
- @After
+ @AfterEach
public void teardown() {
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
AWSGeneralUtil.closeResources(httpClient, kinesisClient);
@@ -142,7 +141,7 @@ public class GlueSchemaRegistryAvroKinesisITCase extends
TestLogger {
for (GenericRecord msg : messages) {
gsrKinesisPubsubClient.sendMessage(getSchema().toString(),
INPUT_STREAM, msg);
}
- log.info("generated records");
+ LOG.info("generated records");
DataStream<GenericRecord> input =
env.fromSource(createSource(),
WatermarkStrategy.noWatermarks(), "source")
@@ -155,11 +154,11 @@ public class GlueSchemaRegistryAvroKinesisITCase extends
TestLogger {
List<Object> results =
gsrKinesisPubsubClient.readAllMessages(OUTPUT_STREAM,
OUTPUT_STREAM_ARN);
while (deadline.hasTimeLeft() && results.size() < messages.size()) {
- log.info("waiting for results..");
+ LOG.info("waiting for results..");
Thread.sleep(1000);
results = gsrKinesisPubsubClient.readAllMessages(OUTPUT_STREAM,
OUTPUT_STREAM_ARN);
}
- log.info("results: {}", results);
+ LOG.info("results: {}", results);
assertThat(results).containsExactlyInAnyOrderElementsOf(messages);
}
diff --git
a/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java
b/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java
index 6a9d752..83f6a23 100644
---
a/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java
+++
b/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java
@@ -32,18 +32,18 @@ import
org.apache.flink.formats.json.glue.schema.registry.GlueSchemaRegistryJson
import
org.apache.flink.formats.json.glue.schema.registry.GlueSchemaRegistryJsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.StringUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import
com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
-import org.junit.After;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -67,10 +67,13 @@ import static
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROT
import static
org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
import static
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
/** End-to-end test for Glue Schema Registry Json format using Localstack. */
-public class GlueSchemaRegistryJsonKinesisITCase extends TestLogger {
- private static final Logger LOGGER =
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+public class GlueSchemaRegistryJsonKinesisITCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(GlueSchemaRegistryJsonKinesisITCase.class);
private static final String INPUT_STREAM = "gsr_json_input_stream";
@@ -92,21 +95,17 @@ public class GlueSchemaRegistryJsonKinesisITCase extends
TestLogger {
private KinesisClient kinesisClient;
private GSRKinesisPubsubClient gsrKinesisPubsubClient;
- @ClassRule
+ @Container
public static LocalstackContainer mockKinesisContainer =
new
LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION))
.withNetworkAliases("localstack");
- @Before
+ @BeforeEach
public void setup() throws Exception {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
- Assume.assumeTrue(
- "Access key not configured, skipping test...",
- !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY));
- Assume.assumeTrue(
- "Secret key not configured, skipping test...",
- !StringUtils.isNullOrWhitespaceOnly(SECRET_KEY));
+ assumeThat(ACCESS_KEY).as("Access key not configured, skipping
test...").isNotBlank();
+ assumeThat(SECRET_KEY).as("Secret key not configured, skipping
test...").isNotBlank();
StaticCredentialsProvider gsrCredentialsProvider =
StaticCredentialsProvider.create(
@@ -123,10 +122,10 @@ public class GlueSchemaRegistryJsonKinesisITCase extends
TestLogger {
gsrKinesisPubsubClient.prepareStream(INPUT_STREAM);
gsrKinesisPubsubClient.prepareStream(OUTPUT_STREAM);
- LOGGER.info("Done setting up the localstack.");
+ LOG.info("Done setting up the localstack.");
}
- @After
+ @AfterEach
public void teardown() {
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
AWSGeneralUtil.closeResources(httpClient, kinesisClient);
@@ -139,7 +138,7 @@ public class GlueSchemaRegistryJsonKinesisITCase extends
TestLogger {
for (JsonDataWithSchema msg : messages) {
gsrKinesisPubsubClient.sendMessage(msg.getSchema(), INPUT_STREAM,
msg);
}
- log.info("generated records");
+ LOG.info("generated records");
DataStream<JsonDataWithSchema> input =
env.fromSource(createSource(),
WatermarkStrategy.noWatermarks(), "source")
@@ -152,11 +151,11 @@ public class GlueSchemaRegistryJsonKinesisITCase extends
TestLogger {
List<Object> results =
gsrKinesisPubsubClient.readAllMessages(OUTPUT_STREAM,
OUTPUT_STREAM_ARN);
while (deadline.hasTimeLeft() && results.size() < messages.size()) {
- log.info("waiting for results..");
+ LOG.info("waiting for results..");
Thread.sleep(1000);
results = gsrKinesisPubsubClient.readAllMessages(OUTPUT_STREAM,
OUTPUT_STREAM_ARN);
}
- log.info("results: {}", results);
+ LOG.info("results: {}", results);
assertThat(results).containsExactlyInAnyOrderElementsOf(messages);
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java
index 0a77c96..7b6ea27 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java
@@ -40,9 +40,10 @@ import org.apache.flink.table.factories.TableOptionsBuilder;
import org.apache.flink.table.factories.TestFormatFactory;
import
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Arrays;
import java.util.Collections;
@@ -60,7 +61,8 @@ import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSou
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link KinesisDynamicSource} created by {@link
KinesisDynamicTableFactory}. */
-public class KinesisDynamicTableSourceFactoryTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KinesisDynamicTableSourceFactoryTest {
@Test
public void testGoodTableSource() {