Hello ElasticSearchIO and beam users/developers,

I am on Beam 2.23.0 and elasticsearch 6.8

I have been using elasticsearchIO.write() successfully.
For the first time, I am trying to use elasticsearchIO.read because I have
a use case where I want to read data from one elasticsearch cluster,
modify data and then write it to another elasticsearch cluster.

My read transform is very simple:

*Pipeline p = input.getPipeline();*

*return p*

*    .apply("Read_From_ES", ElasticsearchIO.read()*

*        .withQuery(query)*

*        .withBatchSize(50)*

*        .withScrollKeepalive("3m")*

*        .withConnectionConfiguration(*

*
ElasticsearchIO.ConnectionConfiguration.create(esConnectionParams.getElasticsearchEndpoints(),
indexName, "_doc")*

*                .withConnectTimeout(240000)*

*                .withSocketTimeout(240000)*

*
.withUsername(esConnectionParams.getElasticsearchUsername()).withPassword(esConnectionParams.getElasticsearchPassword()))*

*    );*

But I am unable to submit job successfully and getting following exception
in my READ transform:

WARNING: Size estimation of the source failed:
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource@7c974942
java.net.ConnectException: Operation timed out
at
org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:823)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
at
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getStats(ElasticsearchIO.java:797)
at
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:700)
at
org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:77)
at
org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
at
org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:38)
at
org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:35)
at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:484)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:423)
at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:182)
at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:888)
at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:194)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at io.prosimo.analytics.beam.datacopy.DataCopy.runDataCopy(DataCopy.java:43)
at io.prosimo.analytics.beam.datacopy.DataCopy.main(DataCopy.java:109)
Caused by: java.net.ConnectException: Operation timed out
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
at
org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at
org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at
org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
at java.lang.Thread.run(Thread.java:748)

I tried  various values while specifying connectTimeout, socketTimeout,
batchSize and scrollKeepalive, but still the same issue.

Any help would be greatly appreciated.

Thanks and Regards
Mohil

Reply via email to