With some investigation in the task manager's log, the exception was raised from RetryRejectedExecutionFailureHandler path, the related logs are showing below, not sure why it's that.
* 5978 2021-06-05 05:31:31,529 INFO org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler [] - Bulk request 1033 has been cancelled. 5979 java.lang.InterruptedException: null 5980 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272] 5981 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272] 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272] 5983 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 5984 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5985 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5986 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] 5987 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5988 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5989 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 5990 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 5991 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 5992 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1] 5993 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1] 5994 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1] 5995 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1] 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1] 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.11-1.13.1.jar:1.13.1] 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] 5999 2021-06-05 05:31:31,530 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: null 6000 java.lang.InterruptedException: null 6001 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272] 6002 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272] 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272] 6004 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 6005 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6006 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6007 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] 6008 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6009 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6010 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 6011 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 6012 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 6013 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1] 6014 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1] 6015 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1] 6016 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1] 6017 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1]* * 6030 2021-06-05 05:31:31,633 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: Connection closed unexpectedly 6031 org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException: Connection closed unexpectedly 6032 at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146) [flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1] 6033 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6034 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6035 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100) [flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1] 6036 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 6037 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] 6038 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283) [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6039 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6040 at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] 6041 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]* On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <zzfu...@gmail.com> wrote: > Hi team, > > We encountered an issue about ES sink connector timeout quite frequently. > As checked the ES cluster is far from being loaded(~40% CPU utilization, no > query, index rate is also low). We're using ES-7 connector, with 12 data > nodes and parallelism of 32. > > The error log is as below, we want to know if there is any way to locate > the issue or configure the timeout parameter. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/ > > *2021-06-05 11:49:10* > *java.lang.RuntimeException: An error occurred in ElasticsearchSink.* > * at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)* > * at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)* > * at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)* > * at > org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)* > * at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)* > * at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)* > * at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)* > * at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)* > * at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)* > * at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)* > * at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)* > * at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)* > * at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)* > * at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)* > * at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)* > * at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)* > * at > org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)* > * at > org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)* > * at > org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)* > * at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)* > * at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)* > * at org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)* > * at org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)* > * at org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)* > * at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)* > * at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)* > * at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)* > * at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)* > * at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)* > * at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)* > * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)* > * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)* > * at java.lang.Thread.run(Thread.java:748)* > *Caused by: java.net <http://java.net>.SocketTimeoutException: 30,000 > milliseconds timeout on connection http-outgoing-21 [ACTIVE]* > * at > org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)* > * at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)* > * at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)* > * at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)* > * at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)* > * at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)* > * at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)* > * at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)* > * at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)* > * at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)* > * ... 1 more* > > Config: > > > > *WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'https://xxx:443 > <https://xxx:443>', 'index' = 'xxx',* > > > > > > > > * 'sink.bulk-flush.max-actions' = '10000', > 'sink.bulk-flush.max-size' = '2mb', 'sink.flush-on-checkpoint' = > 'true', 'connection.max-retry-timeout' = '60s', 'failure-handler' = > 'retry-rejected', 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL', > 'sink.bulk-flush.interval' = '2s');* > > -- > *Best wishes,* > *- Kai* > -- *Best wishes,* *- Kai*