[
https://issues.apache.org/jira/browse/BEAM-5925?focusedWorklogId=171561&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171561
]
ASF GitHub Bot logged work on BEAM-5925:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Dec/18 13:00
Start Date: 03/Dec/18 13:00
Worklog Time Spent: 10m
Work Description: echauchot closed pull request #7065: [BEAM-5925] Add
.withMaxRetryTimeoutMillis() to ElasticsearchIO.ConnectionConfiguration
URL: https://github.com/apache/beam/pull/7065
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 7ebaaf699db0..55788b007bca 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -85,7 +85,9 @@ public static void beforeClass() throws IOException {
node.start();
connectionConfiguration =
ConnectionConfiguration.create(
- new String[] {"http://" + ES_IP + ":" + esHttpPort}, getEsIndex(),
ES_TYPE);
+ new String[] {"http://" + ES_IP + ":" + esHttpPort},
getEsIndex(), ES_TYPE)
+ .withSocketAndRetryTimeout(90000)
+ .withConnectTimeout(5000);
restClient = connectionConfiguration.createClient();
elasticsearchIOTestCommon =
new ElasticsearchIOTestCommon(connectionConfiguration, restClient,
false);
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 46e5b79d772e..24748b9e4637 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -89,7 +89,9 @@ public Settings indexSettings() {
public void setup() {
if (connectionConfiguration == null) {
connectionConfiguration =
- ConnectionConfiguration.create(fillAddresses(), getEsIndex(),
ES_TYPE);
+ ConnectionConfiguration.create(fillAddresses(), getEsIndex(),
ES_TYPE)
+ .withSocketAndRetryTimeout(90000)
+ .withConnectTimeout(5000);
elasticsearchIOTestCommon =
new ElasticsearchIOTestCommon(connectionConfiguration,
getRestClient(), false);
}
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index c93edfc1cb71..d731f339424a 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -89,7 +89,9 @@ public Settings indexSettings() {
public void setup() {
if (connectionConfiguration == null) {
connectionConfiguration =
- ConnectionConfiguration.create(fillAddresses(), getEsIndex(),
ES_TYPE);
+ ConnectionConfiguration.create(fillAddresses(), getEsIndex(),
ES_TYPE)
+ .withSocketAndRetryTimeout(90000)
+ .withConnectTimeout(5000);
elasticsearchIOTestCommon =
new ElasticsearchIOTestCommon(connectionConfiguration,
getRestClient(), false);
}
diff --git
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index f44b1e227768..e48559255b05 100644
---
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -69,6 +69,7 @@
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.ContentType;
@@ -140,6 +141,10 @@
*
* <p>When {withUsePartialUpdate()} is enabled, the input document must
contain an id field and
* {@code withIdFn()} must be used to allow its extraction by the
ElasticsearchIO.
+ *
+ * <p>Optionally, {@code withSocketAndRetryTimeout()} can be used to override
the default retry
+ * timeout and socket timeout of 30000ms. {@code withConnectTimeout()} can be
used to override the
+ * default connect timeout of 1000ms.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class ElasticsearchIO {
@@ -233,6 +238,12 @@ static void checkForErrors(HttpEntity responseEntity, int
backendVersion) throws
public abstract String getType();
+ @Nullable
+ public abstract Integer getSocketAndRetryTimeout();
+
+ @Nullable
+ public abstract Integer getConnectTimeout();
+
abstract Builder builder();
@AutoValue.Builder
@@ -251,6 +262,10 @@ static void checkForErrors(HttpEntity responseEntity, int
backendVersion) throws
abstract Builder setType(String type);
+ abstract Builder setSocketAndRetryTimeout(Integer maxRetryTimeout);
+
+ abstract Builder setConnectTimeout(Integer connectTimeout);
+
abstract ConnectionConfiguration build();
}
@@ -329,12 +344,41 @@ public ConnectionConfiguration
withKeystorePassword(String keystorePassword) {
return builder().setKeystorePassword(keystorePassword).build();
}
+ /**
+ * If set, overwrites the default max retry timeout (30000ms) in the
Elastic {@link RestClient}
+ * and the default socket timeout (30000ms) in the {@link RequestConfig}
of the Elastic {@link
+ * RestClient}.
+ *
+ * @param socketAndRetryTimeout the socket and retry timeout in millis.
+ * @return a {@link ConnectionConfiguration} describes a connection
configuration to
+ * Elasticsearch.
+ */
+ public ConnectionConfiguration withSocketAndRetryTimeout(Integer
socketAndRetryTimeout) {
+ checkArgument(socketAndRetryTimeout != null, "socketAndRetryTimeout can
not be null");
+ return builder().setSocketAndRetryTimeout(socketAndRetryTimeout).build();
+ }
+
+ /**
+ * If set, overwrites the default connect timeout (1000ms) in the {@link
RequestConfig} of the
+ * Elastic {@link RestClient}.
+ *
+ * @param connectTimeout the socket and retry timeout in millis.
+ * @return a {@link ConnectionConfiguration} describes a connection
configuration to
+ * Elasticsearch.
+ */
+ public ConnectionConfiguration withConnectTimeout(Integer connectTimeout) {
+ checkArgument(connectTimeout != null, "connectTimeout can not be null");
+ return builder().setConnectTimeout(connectTimeout).build();
+ }
+
private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("address", getAddresses().toString()));
builder.add(DisplayData.item("index", getIndex()));
builder.add(DisplayData.item("type", getType()));
builder.addIfNotNull(DisplayData.item("username", getUsername()));
builder.addIfNotNull(DisplayData.item("keystore.path",
getKeystorePath()));
+ builder.addIfNotNull(DisplayData.item("socketAndRetryTimeout",
getSocketAndRetryTimeout()));
+ builder.addIfNotNull(DisplayData.item("connectTimeout",
getConnectTimeout()));
}
@VisibleForTesting
@@ -374,6 +418,24 @@ RestClient createClient() throws IOException {
throw new IOException("Can't load the client certificate from the
keystore", e);
}
}
+ restClientBuilder.setRequestConfigCallback(
+ new RestClientBuilder.RequestConfigCallback() {
+ @Override
+ public RequestConfig.Builder customizeRequestConfig(
+ RequestConfig.Builder requestConfigBuilder) {
+ if (getConnectTimeout() != null) {
+ requestConfigBuilder.setConnectTimeout(getConnectTimeout());
+ }
+ if (getSocketAndRetryTimeout() != null) {
+
requestConfigBuilder.setSocketTimeout(getSocketAndRetryTimeout());
+ }
+ return requestConfigBuilder;
+ }
+ });
+ if (getSocketAndRetryTimeout() != null) {
+ restClientBuilder.setMaxRetryTimeoutMillis(getSocketAndRetryTimeout());
+ }
+
return restClientBuilder.build();
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 171561)
Time Spent: 2h (was: 1h 50m)
> Test flake in ElasticsearchIOTest.testWriteFullAddressing
> ---------------------------------------------------------
>
> Key: BEAM-5925
> URL: https://issues.apache.org/jira/browse/BEAM-5925
> Project: Beam
> Issue Type: Bug
> Components: io-java-elasticsearch
> Reporter: Kenneth Knowles
> Assignee: Wout Scheepers
> Priority: Critical
> Time Spent: 2h
> Remaining Estimate: 0h
>
> https://builds.apache.org/view/A-D/view/Beam/job/beam_PostCommit_Java_GradleBuild/1789/
> https://scans.gradle.com/s/j42mwdsn5svcs
> {code}
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.IOException:
> listener timeout after waiting for [30000] ms
> {code}
> Log looks like this:
> {code}
> [2018-10-31T04:06:07,571][INFO ][o.a.b.s.i.e.ElasticsearchIOTest]
> [testWriteFullAddressing]: before test
> [2018-10-31T04:06:07,572][INFO ][o.a.b.s.i.e.ElasticsearchIOTest]
> [ElasticsearchIOTest#testWriteFullAddressing]: setting up test
> [2018-10-31T04:06:07,589][INFO ][o.e.c.m.MetaDataIndexTemplateService]
> [node_s0] adding template [random_index_template] for index patterns [*]
> [2018-10-31T04:06:07,645][INFO ][o.a.b.s.i.e.ElasticsearchIOTest]
> [ElasticsearchIOTest#testWriteFullAddressing]: all set up test
> [2018-10-31T04:06:10,536][INFO ][o.e.c.m.MetaDataCreateIndexService]
> [node_s0] [galilei] creating index, cause [auto(bulk api)], templates
> [random_index_template], shards [6]/[0], mappings []
> [2018-10-31T04:06:33,963][INFO ][o.e.c.m.MetaDataCreateIndexService]
> [node_s0] [curie] creating index, cause [auto(bulk api)], templates
> [random_index_template], shards [6]/[0], mappings []
> [2018-10-31T04:06:34,034][INFO ][o.e.c.m.MetaDataCreateIndexService]
> [node_s0] [darwin] creating index, cause [auto(bulk api)], templates
> [random_index_template], shards [6]/[0], mappings []
> [2018-10-31T04:06:34,050][INFO ][o.e.c.m.MetaDataCreateIndexService]
> [node_s0] [copernicus] creating index, cause [auto(bulk api)], templates
> [random_index_template], shards [6]/[0], mappings []
> [2018-10-31T04:06:34,075][INFO ][o.e.c.m.MetaDataCreateIndexService]
> [node_s0] [faraday] creating index, cause [auto(bulk api)], templates
> [random_index_template], shards [6]/[0], mappings []
> [2018-10-31T04:06:34,095][INFO ][o.e.c.m.MetaDataCreateIndexService]
> [node_s0] [bohr] creating index, cause [auto(bulk api)], templates
> [random_index_template], shards [6]/[0], mappings []
> [2018-10-31T04:06:34,113][INFO ][o.e.c.m.MetaDataCreateIndexService]
> [node_s0] [pasteur] creating index, cause [auto(bulk api)], templates
> [random_index_template], shards [6]/[0], mappings []
> [2018-10-31T04:06:34,142][INFO ][o.e.c.m.MetaDataCreateIndexService]
> [node_s0] [einstein] creating index, cause [auto(bulk api)], templates
> [random_index_template], shards [6]/[0], mappings []
> [2018-10-31T04:06:34,205][INFO ][o.e.c.m.MetaDataCreateIndexService]
> [node_s0] [maxwell] creating index, cause [auto(bulk api)], templates
> [random_index_template], shards [6]/[0], mappings []
> [2018-10-31T04:06:34,226][INFO ][o.e.c.m.MetaDataCreateIndexService]
> [node_s0] [newton] creating index, cause [auto(bulk api)], templates
> [random_index_template], shards [6]/[0], mappings []
> [2018-10-31T04:06:36,914][INFO ][o.e.c.r.a.AllocationService] [node_s0]
> Cluster health status changed from [YELLOW] to [GREEN] (reason: [shards
> started [[galilei][4], [galilei][5]] ...]).
> [2018-10-31T04:06:36,970][INFO ][o.e.c.m.MetaDataMappingService] [node_s0]
> [galilei/Vn1b8XXVSAmrTb5BVe2IJQ] create_mapping [TYPE_1]
> [2018-10-31T04:06:37,137][INFO ][o.e.c.m.MetaDataMappingService] [node_s0]
> [newton/bjnImLt_QguBGEFH9lBJ6Q] create_mapping [TYPE_-1]
> [2018-10-31T04:06:37,385][INFO ][o.e.c.m.MetaDataMappingService] [node_s0]
> [maxwell/-RZ32NbRRZWaGaVfaptFIA] create_mapping [TYPE_0]
> [2018-10-31T04:06:37,636][INFO ][o.e.c.m.MetaDataMappingService] [node_s0]
> [einstein/2lgF5Vj6Ti2KTS-pYSzv3Q] create_mapping [TYPE_1]
> [2018-10-31T04:06:37,806][INFO ][o.e.c.m.MetaDataMappingService] [node_s0]
> [pasteur/832OwzleRSOHsWx85vOH-w] create_mapping [TYPE_0]
> [2018-10-31T04:06:38,103][INFO ][o.e.c.m.MetaDataMappingService] [node_s0]
> [bohr/9YTwB1yvTYKf9YjYCmHjwg] create_mapping [TYPE_1]
> [2018-10-31T04:06:38,229][INFO ][o.e.c.m.MetaDataMappingService] [node_s0]
> [faraday/vIMYG8vpTQKqNkyajcFOxw] create_mapping [TYPE_0]
> [2018-10-31T04:06:38,576][INFO ][o.e.c.m.MetaDataMappingService] [node_s0]
> [copernicus/NzCZssInSiOdZKTmLCoXRw] create_mapping [TYPE_1]
> [2018-10-31T04:06:38,890][INFO ][o.e.c.m.MetaDataMappingService] [node_s0]
> [darwin/g_sIfS5aQwi6BAXw_--vgw] create_mapping [TYPE_1]
> [2018-10-31T04:06:39,201][INFO ][o.e.c.m.MetaDataMappingService] [node_s0]
> [curie/PDuZqTZQROytGLowXGMxhA] create_mapping [TYPE_0]
> [2018-10-31T04:06:40,030][INFO ][o.a.b.s.i.e.ElasticsearchIOTest]
> [ElasticsearchIOTest#testWriteFullAddressing]: cleaning up after test
> [2018-10-31T04:06:40,185][INFO ][o.e.c.m.MetaDataDeleteIndexService]
> [node_s0] [bohr/9YTwB1yvTYKf9YjYCmHjwg] deleting index
> [2018-10-31T04:06:40,185][INFO ][o.e.c.m.MetaDataDeleteIndexService]
> [node_s0] [copernicus/NzCZssInSiOdZKTmLCoXRw] deleting index
> [2018-10-31T04:06:40,185][INFO ][o.e.c.m.MetaDataDeleteIndexService]
> [node_s0] [maxwell/-RZ32NbRRZWaGaVfaptFIA] deleting index
> [2018-10-31T04:06:40,185][INFO ][o.e.c.m.MetaDataDeleteIndexService]
> [node_s0] [pasteur/832OwzleRSOHsWx85vOH-w] deleting index
> [2018-10-31T04:06:40,185][INFO ][o.e.c.m.MetaDataDeleteIndexService]
> [node_s0] [einstein/2lgF5Vj6Ti2KTS-pYSzv3Q] deleting index
> [2018-10-31T04:06:40,185][INFO ][o.e.c.m.MetaDataDeleteIndexService]
> [node_s0] [newton/bjnImLt_QguBGEFH9lBJ6Q] deleting index
> [2018-10-31T04:06:40,185][INFO ][o.e.c.m.MetaDataDeleteIndexService]
> [node_s0] [darwin/g_sIfS5aQwi6BAXw_--vgw] deleting index
> [2018-10-31T04:06:40,185][INFO ][o.e.c.m.MetaDataDeleteIndexService]
> [node_s0] [galilei/Vn1b8XXVSAmrTb5BVe2IJQ] deleting index
> [2018-10-31T04:06:40,185][INFO ][o.e.c.m.MetaDataDeleteIndexService]
> [node_s0] [curie/PDuZqTZQROytGLowXGMxhA] deleting index
> [2018-10-31T04:06:40,185][INFO ][o.e.c.m.MetaDataDeleteIndexService]
> [node_s0] [faraday/vIMYG8vpTQKqNkyajcFOxw] deleting index
> [2018-10-31T04:06:40,801][INFO ][o.e.c.m.MetaDataIndexTemplateService]
> [node_s0] removing template [random_index_template]
> [2018-10-31T04:06:40,811][INFO ][o.a.b.s.i.e.ElasticsearchIOTest]
> [ElasticsearchIOTest#testWriteFullAddressing]: cleaned up after test
> [2018-10-31T04:06:40,811][INFO ][o.a.b.s.i.e.ElasticsearchIOTest]
> [testWriteFullAddressing]: after test
> {code}
> So perhaps it is just too tight a timeout?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)