This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push: new bbf68c6 [FLINK-20410] Retry querying for schema in the schema registry e2e test. bbf68c6 is described below commit bbf68c656a352129ab621efacdf0ab561071d3de Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Mon Nov 30 10:36:25 2020 +0100 [FLINK-20410] Retry querying for schema in the schema registry e2e test. --- .../util/kafka/SQLClientSchemaRegistryITCase.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java index 1906b58..34500d3 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.tests.util.kafka; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.tests.util.TestUtils; import org.apache.flink.tests.util.categories.TravisGroup1; import org.apache.flink.tests.util.flink.FlinkContainer; @@ -26,6 +27,7 @@ import org.apache.flink.tests.util.kafka.containers.SchemaRegistryContainer; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.avro.Schema; @@ -42,6 +44,7 @@ import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.nio.file.Path; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -192,7 +195,7 @@ public class SQLClientSchemaRegistryITCase { executeSqlStatements(sqlLines); - List<Integer> versions = registryClient.getAllVersions(behaviourSubject); + List<Integer> versions = getAllVersions(behaviourSubject); assertThat(versions.size(), equalTo(1)); List<Object> userBehaviors = kafkaClient.readMessages( 1, @@ -217,6 +220,20 @@ public class SQLClientSchemaRegistryITCase { )); } + private List<Integer> getAllVersions(String behaviourSubject) throws Exception { + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30)); + Exception ex = new IllegalStateException( + "Could not query schema registry. Negative deadline provided."); + while (deadline.hasTimeLeft()) { + try { + return registryClient.getAllVersions(behaviourSubject); + } catch (RestClientException e) { + ex = e; + } + } + throw ex; + } + private void executeSqlStatements(List<String> sqlLines) throws Exception { flink.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) .addJars(sqlAvroJar, sqlAvroRegistryJar, sqlConnectorKafkaJar, sqlToolBoxJar)