This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new bbf68c6  [FLINK-20410] Retry querying for schema in the schema 
registry e2e test.
bbf68c6 is described below

commit bbf68c656a352129ab621efacdf0ab561071d3de
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Mon Nov 30 10:36:25 2020 +0100

    [FLINK-20410] Retry querying for schema in the schema registry e2e test.
---
 .../util/kafka/SQLClientSchemaRegistryITCase.java     | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
index 1906b58..34500d3 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.tests.util.kafka;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.categories.TravisGroup1;
 import org.apache.flink.tests.util.flink.FlinkContainer;
@@ -26,6 +27,7 @@ import 
org.apache.flink.tests.util.kafka.containers.SchemaRegistryContainer;
 
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import io.confluent.kafka.serializers.KafkaAvroSerializer;
 import org.apache.avro.Schema;
@@ -42,6 +44,7 @@ import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -192,7 +195,7 @@ public class SQLClientSchemaRegistryITCase {
 
                executeSqlStatements(sqlLines);
 
-               List<Integer> versions = 
registryClient.getAllVersions(behaviourSubject);
+               List<Integer> versions = getAllVersions(behaviourSubject);
                assertThat(versions.size(), equalTo(1));
                List<Object> userBehaviors = kafkaClient.readMessages(
                        1,
@@ -217,6 +220,20 @@ public class SQLClientSchemaRegistryITCase {
                ));
        }
 
+       private List<Integer> getAllVersions(String behaviourSubject) throws 
Exception {
+               Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
+               Exception ex = new IllegalStateException(
+                       "Could not query schema registry. Negative deadline 
provided.");
+               while (deadline.hasTimeLeft()) {
+                       try {
+                               return 
registryClient.getAllVersions(behaviourSubject);
+                       } catch (RestClientException e) {
+                               ex = e;
+                       }
+               }
+               throw ex;
+       }
+
        private void executeSqlStatements(List<String> sqlLines) throws 
Exception {
                flink.submitSQLJob(new 
SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
                        .addJars(sqlAvroJar, sqlAvroRegistryJar, 
sqlConnectorKafkaJar, sqlToolBoxJar)

Reply via email to