[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16742179#comment-16742179
 ] 

xueyu commented on FLINK-11046:
-------------------------------

Hi, [~dawidwys], have you worked on this issue? If not could you please assign 
this issue to me..I investigated this issue these two days and have some ideas 
to fix it and wanted to have a try on it. According to [~tzulitai] comments, my 
thought was writing a new RequestIndexer which uses BulkRequest to buffer 
action requests. I was a little late to assign it... Thanks~

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -------------------------------------------------------------------------
>
>                 Key: FLINK-11046
>                 URL: https://issues.apache.org/jira/browse/FLINK-11046
>             Project: Flink
>          Issue Type: Bug
>          Components: ElasticSearch Connector
>    Affects Versions: 1.6.2
>            Reporter: luoguohao
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
>     Runnable toRelease = () -> {};
>     boolean bulkRequestSetupSuccessful = false;
>     try {
>         listener.beforeBulk(executionId, bulkRequest);
>         semaphore.acquire();
>         toRelease = semaphore::release;
>         CountDownLatch latch = new CountDownLatch(1);
>         retry.withBackoff(consumer, bulkRequest, new 
> ActionListener<BulkResponse>() {
>             @Override
>             public void onResponse(BulkResponse response) {
>                 try {
>                     listener.afterBulk(executionId, bulkRequest, response);
>                 } finally {
>                     semaphore.release();
>                     latch.countDown();
>                 }
>             }
>             @Override
>             public void onFailure(Exception e) {
>                 try {
>                     listener.afterBulk(executionId, bulkRequest, e);
>                 } finally {
>                     semaphore.release();
>                     latch.countDown();
>                 }
>             }
>         }, Settings.EMPTY);
>         bulkRequestSetupSuccessful = true;
>        if (concurrentRequests == 0) {
>            latch.await();
>         }
>     } catch (InterruptedException e) {
>         Thread.currentThread().interrupt();
>         logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
>         listener.afterBulk(executionId, bulkRequest, e);
>     } catch (Exception e) {
>         logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
>         listener.afterBulk(executionId, bulkRequest, e);
>     } finally {
>         if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
>             toRelease.run();
>         }
>     }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk process thread never release 
> the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
> `concurrentRequests` was set to zero. And i saw the the initialize for 
> bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =      
> callBridge.createBulkProcessorBuilder(client, listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make 
> sense, but i still wonder why the retry operation is not in the same thread 
> as the bulk process execution, after i read the code, `bulkAsync` method 
> might be the last puzzle.
> {code:java}
> @Override
> public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient 
> client, BulkProcessor.Listener listener) {
>  return BulkProcessor.builder(client::bulkAsync, listener);
> }
> {code}
> So, I hope someone can help to fix this problem, or given some suggestions, 
> and also i can make a try to take it. 
>  Thanks a lot !



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to