This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 92e6533  [FLINK-23556][tests] Make SQLClientSchemaRegistryITCase more 
stable (#16952)
92e6533 is described below

commit 92e65333cc7ecbd7886f6a348c3f21d3fe85a942
Author: bgeng777 <80749729+bgeng...@users.noreply.github.com>
AuthorDate: Wed Aug 25 11:50:42 2021 +0800

    [FLINK-23556][tests] Make SQLClientSchemaRegistryITCase more stable (#16952)
---
 .../tests/util/kafka/KafkaContainerClient.java     |  2 +-
 .../util/kafka/SQLClientSchemaRegistryITCase.java  | 41 ++++++++++++----------
 2 files changed, 24 insertions(+), 19 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
index f4025df..250015e 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
@@ -106,7 +106,7 @@ public class KafkaContainerClient {
                         "Waiting for messages. Received {}/{}.",
                         messages.size(),
                         expectedNumMessages);
-                ConsumerRecords<Bytes, T> records = 
consumer.poll(Duration.ofMillis(100));
+                ConsumerRecords<Bytes, T> records = 
consumer.poll(Duration.ofMillis(1000));
                 for (ConsumerRecord<Bytes, T> record : records) {
                     messages.add(record.value());
                 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
index ef5d4e6..b917ba9 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
@@ -37,13 +37,15 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 
 import java.nio.file.Path;
@@ -60,6 +62,9 @@ import static org.junit.Assert.assertThat;
 /** End-to-end test for SQL client using Avro Confluent Registry format. */
 @Category(value = {TravisGroup1.class})
 public class SQLClientSchemaRegistryITCase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SQLClientSchemaRegistryITCase.class);
+    private static final Slf4jLogConsumer LOG_CONSUMER = new 
Slf4jLogConsumer(LOG);
+
     public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
     public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry";
     private static final Path sqlAvroJar = TestUtils.getResource(".*avro.jar");
@@ -67,18 +72,19 @@ public class SQLClientSchemaRegistryITCase {
     private static final Path sqlToolBoxJar = 
TestUtils.getResource(".*SqlToolbox.jar");
     private final Path sqlConnectorKafkaJar = 
TestUtils.getResource(".*kafka.jar");
 
-    public final Network network = Network.newNetwork();
+    @ClassRule private static final Network network = Network.newNetwork();
 
     @ClassRule public static final Timeout TIMEOUT = new Timeout(10, 
TimeUnit.MINUTES);
 
-    @Rule
-    public final KafkaContainer kafka =
+    @ClassRule
+    private static final KafkaContainer kafka =
             new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
                     .withNetwork(network)
-                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
+                    .withLogConsumer(LOG_CONSUMER);
 
-    @Rule
-    public final SchemaRegistryContainer registry =
+    @ClassRule
+    private static final SchemaRegistryContainer registry =
             new SchemaRegistryContainer("5.5.2")
                     .withKafka(INTER_CONTAINER_KAFKA_ALIAS + ":9092")
                     .withNetwork(network)
@@ -89,16 +95,16 @@ public class SQLClientSchemaRegistryITCase {
     public final FlinkContainer flink =
             
FlinkContainer.builder().build().withNetwork(network).dependsOn(kafka);
 
-    private final KafkaContainerClient kafkaClient = new 
KafkaContainerClient(kafka);
+    private KafkaContainerClient kafkaClient;
     private CachedSchemaRegistryClient registryClient;
 
     @Before
     public void setUp() {
+        kafkaClient = new KafkaContainerClient(kafka);
         registryClient = new 
CachedSchemaRegistryClient(registry.getSchemaRegistryUrl(), 10);
     }
 
-    @Ignore("FLINK-23556")
-    @Test(timeout = 120_000)
+    @Test
     public void testReading() throws Exception {
         String testCategoryTopic = "test-category-" + 
UUID.randomUUID().toString();
         String testResultsTopic = "test-results-" + 
UUID.randomUUID().toString();
@@ -133,6 +139,7 @@ public class SQLClientSchemaRegistryITCase {
                                 + ":9092',",
                         " 'topic' = '" + testCategoryTopic + "',",
                         " 'scan.startup.mode' = 'earliest-offset',",
+                        " 'properties.group.id' = 'test-group',",
                         " 'format' = 'avro-confluent',",
                         " 'avro-confluent.url' = 'http://";
                                 + INTER_CONTAINER_REGISTRY_ALIAS
@@ -148,6 +155,7 @@ public class SQLClientSchemaRegistryITCase {
                         " 'properties.bootstrap.servers' = '"
                                 + INTER_CONTAINER_KAFKA_ALIAS
                                 + ":9092',",
+                        " 'properties.group.id' = 'test-group',",
                         " 'topic' = '" + testResultsTopic + "',",
                         " 'format' = 'csv',",
                         " 'csv.null-literal' = 'null'",
@@ -162,8 +170,7 @@ public class SQLClientSchemaRegistryITCase {
         assertThat(categories, 
equalTo(Collections.singletonList("1,electronics,null")));
     }
 
-    @Ignore("FLINK-23556")
-    @Test(timeout = 120_000)
+    @Test
     public void testWriting() throws Exception {
         String testUserBehaviorTopic = "test-user-behavior-" + 
UUID.randomUUID().toString();
         // Create topic test-avro
@@ -204,11 +211,9 @@ public class SQLClientSchemaRegistryITCase {
                         testUserBehaviorTopic,
                         new KafkaAvroDeserializer(registryClient));
 
-        Schema userBehaviorSchema =
-                (Schema)
-                        registryClient
-                                .getSchemaBySubjectAndId(behaviourSubject, 
versions.get(0))
-                                .rawSchema();
+        String schemaString =
+                registryClient.getByVersion(behaviourSubject, versions.get(0), 
false).getSchema();
+        Schema userBehaviorSchema = new Schema.Parser().parse(schemaString);
         GenericRecordBuilder recordBuilder = new 
GenericRecordBuilder(userBehaviorSchema);
         assertThat(
                 userBehaviors,
@@ -224,7 +229,7 @@ public class SQLClientSchemaRegistryITCase {
     }
 
     private List<Integer> getAllVersions(String behaviourSubject) throws 
Exception {
-        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
+        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
         Exception ex =
                 new IllegalStateException(
                         "Could not query schema registry. Negative deadline 
provided.");

Reply via email to