Re: Elasticsearch sink connector timeout
Hi, Kai, I think the exception should be thrown from RetryRejectedExecutionFailureHandler as you configure the 'failure-handler' to 'retry-rejected'. It will retry the action that failed with EsRejectedExecutionException and throw all other failures. AFAIK, there is no way to configure the connection/socket timeout in Elasticsearch SQL connector. However, if the root cause is a network jitter, you may increase the sink.bulk-flush.backoff.delay and the sink.bulk-flush.backoff.max-retries. Best, Yangze Guo On Sat, Jun 5, 2021 at 2:28 PM Kai Fu wrote: > > With some investigation in the task manager's log, the exception was raised > from RetryRejectedExecutionFailureHandler path, the related logs are showing > below, not sure why it's that. > > > 5978 2021-06-05 05:31:31,529 INFO > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler > [] - Bulk request 1033 has been cancelled. > 5979 java.lang.InterruptedException: null > 5980 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) > ~[?:1.8.0_272] > 5981 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > ~[?:1.8.0_272] > 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > ~[?:1.8.0_272] > 5983 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 5984 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5985 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5986 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] > 5987 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5988 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5989 at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5990 at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5991 at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5992 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5993 at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5994 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5995 at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] > 5999 2021-06-05 05:31:31,530 ERROR > org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler > [] - Failed Elasticsearch item request: null > 6000 java.lang.InterruptedException: null > 6001 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) > ~[?:1.8.0_272] > 6002 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > ~[?:1.8.0_272] > 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > ~[?:1.8.0_272] > 6004 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 6005 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6006 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) >
Re: Elasticsearch sink connector timeout
With some investigation in the task manager's log, the exception was raised from RetryRejectedExecutionFailureHandler path, the related logs are showing below, not sure why it's that. * 5978 2021-06-05 05:31:31,529 INFO org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler [] - Bulk request 1033 has been cancelled. 5979 java.lang.InterruptedException: null 5980 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272] 5981 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272] 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272] 5983 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 5984 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5985 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5986 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] 5987 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5988 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5989 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 5990 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 5991 at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) ~[flink-dist_2.11-1.13.1.jar:1.13.1] 5992 at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) [flink-dist_2.11-1.13.1.jar:1.13.1] 5993 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) [flink-dist_2.11-1.13.1.jar:1.13.1] 5994 at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) [flink-dist_2.11-1.13.1.jar:1.13.1] 5995 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.11-1.13.1.jar:1.13.1] 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.13.1.jar:1.13.1] 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.11-1.13.1.jar:1.13.1] 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] 5999 2021-06-05 05:31:31,530 ERROR org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler [] - Failed Elasticsearch item request: null 6000 java.lang.InterruptedException: null 6001 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[?:1.8.0_272] 6002 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[?:1.8.0_272] 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[?:1.8.0_272] 6004 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 6005 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6006 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6007 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] 6008 at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6009 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6010 at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)