This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 92e6533 [FLINK-23556][tests] Make SQLClientSchemaRegistryITCase more stable (#16952) 92e6533 is described below commit 92e65333cc7ecbd7886f6a348c3f21d3fe85a942 Author: bgeng777 <80749729+bgeng...@users.noreply.github.com> AuthorDate: Wed Aug 25 11:50:42 2021 +0800 [FLINK-23556][tests] Make SQLClientSchemaRegistryITCase more stable (#16952) --- .../tests/util/kafka/KafkaContainerClient.java | 2 +- .../util/kafka/SQLClientSchemaRegistryITCase.java | 41 ++++++++++++---------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java index f4025df..250015e 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java @@ -106,7 +106,7 @@ public class KafkaContainerClient { "Waiting for messages. Received {}/{}.", messages.size(), expectedNumMessages); - ConsumerRecords<Bytes, T> records = consumer.poll(Duration.ofMillis(100)); + ConsumerRecords<Bytes, T> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<Bytes, T> record : records) { messages.add(record.value()); } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java index ef5d4e6..b917ba9 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java @@ -37,13 +37,15 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; import java.nio.file.Path; @@ -60,6 +62,9 @@ import static org.junit.Assert.assertThat; /** End-to-end test for SQL client using Avro Confluent Registry format. */ @Category(value = {TravisGroup1.class}) public class SQLClientSchemaRegistryITCase { + private static final Logger LOG = LoggerFactory.getLogger(SQLClientSchemaRegistryITCase.class); + private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG); + public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry"; private static final Path sqlAvroJar = TestUtils.getResource(".*avro.jar"); @@ -67,18 +72,19 @@ public class SQLClientSchemaRegistryITCase { private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar"); private final Path sqlConnectorKafkaJar = TestUtils.getResource(".*kafka.jar"); - public final Network network = Network.newNetwork(); + @ClassRule private static final Network network = Network.newNetwork(); @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); - @Rule - public final KafkaContainer kafka = + @ClassRule + private static final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) .withNetwork(network) - .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS) + .withLogConsumer(LOG_CONSUMER); - @Rule - public final SchemaRegistryContainer registry = + @ClassRule + private static final SchemaRegistryContainer registry = new SchemaRegistryContainer("5.5.2") .withKafka(INTER_CONTAINER_KAFKA_ALIAS + ":9092") .withNetwork(network) @@ -89,16 +95,16 @@ public class SQLClientSchemaRegistryITCase { public final FlinkContainer flink = FlinkContainer.builder().build().withNetwork(network).dependsOn(kafka); - private final KafkaContainerClient kafkaClient = new KafkaContainerClient(kafka); + private KafkaContainerClient kafkaClient; private CachedSchemaRegistryClient registryClient; @Before public void setUp() { + kafkaClient = new KafkaContainerClient(kafka); registryClient = new CachedSchemaRegistryClient(registry.getSchemaRegistryUrl(), 10); } - @Ignore("FLINK-23556") - @Test(timeout = 120_000) + @Test public void testReading() throws Exception { String testCategoryTopic = "test-category-" + UUID.randomUUID().toString(); String testResultsTopic = "test-results-" + UUID.randomUUID().toString(); @@ -133,6 +139,7 @@ public class SQLClientSchemaRegistryITCase { + ":9092',", " 'topic' = '" + testCategoryTopic + "',", " 'scan.startup.mode' = 'earliest-offset',", + " 'properties.group.id' = 'test-group',", " 'format' = 'avro-confluent',", " 'avro-confluent.url' = 'http://" + INTER_CONTAINER_REGISTRY_ALIAS @@ -148,6 +155,7 @@ public class SQLClientSchemaRegistryITCase { " 'properties.bootstrap.servers' = '" + INTER_CONTAINER_KAFKA_ALIAS + ":9092',", + " 'properties.group.id' = 'test-group',", " 'topic' = '" + testResultsTopic + "',", " 'format' = 'csv',", " 'csv.null-literal' = 'null'", @@ -162,8 +170,7 @@ public class SQLClientSchemaRegistryITCase { assertThat(categories, equalTo(Collections.singletonList("1,electronics,null"))); } - @Ignore("FLINK-23556") - @Test(timeout = 120_000) + @Test public void testWriting() throws Exception { String testUserBehaviorTopic = "test-user-behavior-" + UUID.randomUUID().toString(); // Create topic test-avro @@ -204,11 +211,9 @@ public class SQLClientSchemaRegistryITCase { testUserBehaviorTopic, new KafkaAvroDeserializer(registryClient)); - Schema userBehaviorSchema = - (Schema) - registryClient - .getSchemaBySubjectAndId(behaviourSubject, versions.get(0)) - .rawSchema(); + String schemaString = + registryClient.getByVersion(behaviourSubject, versions.get(0), false).getSchema(); + Schema userBehaviorSchema = new Schema.Parser().parse(schemaString); GenericRecordBuilder recordBuilder = new GenericRecordBuilder(userBehaviorSchema); assertThat( userBehaviors, @@ -224,7 +229,7 @@ public class SQLClientSchemaRegistryITCase { } private List<Integer> getAllVersions(String behaviourSubject) throws Exception { - Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30)); + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120)); Exception ex = new IllegalStateException( "Could not query schema registry. Negative deadline provided.");