This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 6c97f22  [FLINK-17159] Harden ElasticsearchSinkITCase
6c97f22 is described below

commit 6c97f22913197e7a5a948f67b7a0b7f8fb480fa7
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Tue Aug 25 10:25:17 2020 +0200

    [FLINK-17159] Harden ElasticsearchSinkITCase
    
    Before, it could happen that the embedded node is not ready. Now we wait
    for nodes/data nodes to be live before returning from the initialization
    method.
---
 .../testutils/ElasticsearchResource.java           | 30 ++++++++++++++++++++++
 1 file changed, 30 insertions(+)

diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
index 6f185d3..68e0fe8 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
@@ -21,12 +21,21 @@ package 
org.apache.flink.streaming.connectors.elasticsearch.testutils;
 import 
org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironment;
 import org.apache.flink.util.InstantiationUtil;
 
+import org.elasticsearch.action.ActionFuture;
+import 
org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.client.ClusterAdminClient;
+import org.elasticsearch.common.unit.TimeValue;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
 /**
  * A resource that starts an embedded elasticsearch cluster.
  */
@@ -55,6 +64,27 @@ public class ElasticsearchResource extends ExternalResource {
 
                tempFolder.create();
                embeddedNodeEnv.start(tempFolder.newFolder(), clusterName);
+
+               waitForCluster();
+       }
+
+       /**
+        * Blocks until the cluster is ready and data nodes/nodes are live.
+        */
+       private void waitForCluster() {
+               AdminClient adminClient = embeddedNodeEnv.getClient().admin();
+               ClusterAdminClient clusterAdminClient = adminClient.cluster();
+
+               ClusterHealthRequestBuilder requestBuilder = 
clusterAdminClient.prepareHealth("_all");
+               requestBuilder = 
requestBuilder.setTimeout(TimeValue.timeValueSeconds(120));
+
+               ActionFuture<ClusterHealthResponse> healthFuture =
+                               
clusterAdminClient.health(requestBuilder.request());
+
+               ClusterHealthResponse health = 
healthFuture.actionGet(TimeValue.timeValueSeconds(120));
+
+               assertThat(health.getNumberOfNodes(), greaterThanOrEqualTo(1));
+               assertThat(health.getNumberOfDataNodes(), 
greaterThanOrEqualTo(1));
        }
 
        @Override

Reply via email to