This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 6c97f22 [FLINK-17159] Harden ElasticsearchSinkITCase 6c97f22 is described below commit 6c97f22913197e7a5a948f67b7a0b7f8fb480fa7 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Tue Aug 25 10:25:17 2020 +0200 [FLINK-17159] Harden ElasticsearchSinkITCase Before, it could happen that the embedded node is not ready. Now we wait for nodes/data nodes to be live before returning from the initialization method. --- .../testutils/ElasticsearchResource.java | 30 ++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java index 6f185d3..68e0fe8 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java @@ -21,12 +21,21 @@ package org.apache.flink.streaming.connectors.elasticsearch.testutils; import org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironment; import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ClusterAdminClient; +import org.elasticsearch.common.unit.TimeValue; import org.junit.rules.ExternalResource; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertThat; + /** * A resource that starts an embedded elasticsearch cluster. */ @@ -55,6 +64,27 @@ public class ElasticsearchResource extends ExternalResource { tempFolder.create(); embeddedNodeEnv.start(tempFolder.newFolder(), clusterName); + + waitForCluster(); + } + + /** + * Blocks until the cluster is ready and data nodes/nodes are live. + */ + private void waitForCluster() { + AdminClient adminClient = embeddedNodeEnv.getClient().admin(); + ClusterAdminClient clusterAdminClient = adminClient.cluster(); + + ClusterHealthRequestBuilder requestBuilder = clusterAdminClient.prepareHealth("_all"); + requestBuilder = requestBuilder.setTimeout(TimeValue.timeValueSeconds(120)); + + ActionFuture<ClusterHealthResponse> healthFuture = + clusterAdminClient.health(requestBuilder.request()); + + ClusterHealthResponse health = healthFuture.actionGet(TimeValue.timeValueSeconds(120)); + + assertThat(health.getNumberOfNodes(), greaterThanOrEqualTo(1)); + assertThat(health.getNumberOfDataNodes(), greaterThanOrEqualTo(1)); } @Override