Hi team, We encountered an issue about ES sink connector timeout quite frequently. As checked the ES cluster is far from being loaded(~40% CPU utilization, no query, index rate is also low). We're using ES-7 connector, with 12 data nodes and parallelism of 32.
The error log is as below, we want to know if there is any way to locate the issue or configure the timeout parameter. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/ *2021-06-05 11:49:10* *java.lang.RuntimeException: An error occurred in ElasticsearchSink.* * at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)* * at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)* * at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)* * at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)* * at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)* * at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)* * at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)* * at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)* * at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)* * at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)* * at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)* * at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)* * at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)* * at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)* * at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)* * at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)* * at org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)* * at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)* * at org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)* * at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)* * at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)* * at org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)* * at org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)* * at org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)* * at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)* * at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)* * at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)* * at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)* * at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)* * at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)* * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)* * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)* * at java.lang.Thread.run(Thread.java:748)* *Caused by: java.net <http://java.net>.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-21 [ACTIVE]* * at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)* * at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)* * at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)* * at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)* * at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)* * at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)* * at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)* * at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)* * at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)* * at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)* * ... 1 more* Config: *WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'https://xxx:443 <https://xxx:443>', 'index' = 'xxx',* * 'sink.bulk-flush.max-actions' = '10000', 'sink.bulk-flush.max-size' = '2mb', 'sink.flush-on-checkpoint' = 'true', 'connection.max-retry-timeout' = '60s', 'failure-handler' = 'retry-rejected', 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL', 'sink.bulk-flush.interval' = '2s');* -- *Best wishes,* *- Kai*