Re: Elasticsearch sink connector timeout

2021-06-06 Thread Yangze Guo
Hi, Kai,

I think the exception should be thrown from
RetryRejectedExecutionFailureHandler as you configure the
'failure-handler' to 'retry-rejected'. It will retry the action that
failed with EsRejectedExecutionException and throw all other failures.

AFAIK, there is no way to configure the connection/socket timeout in
Elasticsearch SQL connector. However, if the root cause is a network
jitter, you may increase the sink.bulk-flush.backoff.delay and the
sink.bulk-flush.backoff.max-retries.


Best,
Yangze Guo

On Sat, Jun 5, 2021 at 2:28 PM Kai Fu  wrote:
>
> With some investigation in the task manager's log, the exception was raised 
> from RetryRejectedExecutionFailureHandler path, the related logs are showing 
> below, not sure why it's that.
>
>
> 5978 2021-06-05 05:31:31,529 INFO 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler
>  [] - Bulk request 1033 has been cancelled.
> 5979 java.lang.InterruptedException: null
> 5980 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 5981 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 5983 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 5984 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5985 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5986 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 5987 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5988 at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5989 at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5990 at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5991 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5992 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5993 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5994 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5995 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
> 5999 2021-06-05 05:31:31,530 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
>  [] - Failed Elasticsearch item request: null
> 6000 java.lang.InterruptedException: null
> 6001 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 6002 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 6004 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6005 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6006 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
>  

Re: Elasticsearch sink connector timeout

2021-06-05 Thread Kai Fu
With some investigation in the task manager's log, the exception was raised
from RetryRejectedExecutionFailureHandler path, the related logs are
showing below, not sure why it's that.










































* 5978 2021-06-05 05:31:31,529 INFO
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler
[] - Bulk request 1033 has been cancelled. 5979
java.lang.InterruptedException: null 5980 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
~[?:1.8.0_272] 5981 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
~[?:1.8.0_272] 5982 at
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
~[?:1.8.0_272] 5983 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 5984 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5985 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5986 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] 5987 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5988 at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 5989 at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 5990 at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 5991 at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 5992 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5993 at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5994 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5995 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5996 at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5997 at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
[flink-dist_2.11-1.13.1.jar:1.13.1] 5998 at
java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] 5999 2021-06-05
05:31:31,530 ERROR
org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
[] - Failed Elasticsearch item request: null 6000
java.lang.InterruptedException: null 6001 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
~[?:1.8.0_272] 6002 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
~[?:1.8.0_272] 6003 at
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
~[?:1.8.0_272] 6004 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 6005 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6006 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6007 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] 6008 at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6009 at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6010 at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)