[ 
https://issues.apache.org/jira/browse/FLINK-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687278#comment-16687278
 ] 

ASF GitHub Bot commented on FLINK-10774:
----------------------------------------

stevenzwu edited a comment on issue #7020: [FLINK-10774] [Kafka] connection 
leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-438544127
 
 
   we saw some exception regarding redundant calls to close method. will need 
to fix it.
   ```
   2018-11-13 18:26:54,928 ERROR 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - failed to 
close partitionDiscoverer
   java.lang.IllegalStateException: This consumer has already been closed.
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1613)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1526)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1506)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.closeConnections(Kafka09PartitionDiscoverer.java:97)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.close(AbstractPartitionDiscoverer.java:101)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:673)
        at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:108)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:100)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:369)
        at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1481)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   Can `run` method and `cancel` method executed in different threads? if so, 
we need to add `synchronized` to this method in `Kafka09PartitionDiscoverer`. 
   ```
        @Override
        protected void closeConnections() throws Exception {
                if (this.kafkaConsumer != null) {
                        this.kafkaConsumer.close();
   
                        // de-reference the consumer to avoid closing multiple 
times
                        this.kafkaConsumer = null;
                }
        }
   ```
   
   I looked at the `KafkaConsumer` code again. its close method calls 
`acquire`. I thought it is acquire lock, which is not the case.
   ```
       private void acquire() {
           this.ensureNotClosed();
           long threadId = Thread.currentThread().getId();
           if (threadId != this.currentThread.get() && 
!this.currentThread.compareAndSet(-1L, threadId)) {
               throw new ConcurrentModificationException("KafkaConsumer is not 
safe for multi-threaded access");
           } else {
               this.refcount.incrementAndGet();
           }
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> connection leak when partition discovery is disabled and open throws exception
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-10774
>                 URL: https://issues.apache.org/jira/browse/FLINK-10774
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.2, 1.5.5, 1.6.2
>            Reporter: Steven Zhen Wu
>            Assignee: Steven Zhen Wu
>            Priority: Major
>              Labels: pull-request-available
>
> Here is the scenario to reproduce the issue
>  * partition discovery is disabled
>  * open method throws an exception (e.g. when broker SSL authorization denies 
> request)
> In this scenario, run method won't be executed. As a result, 
> _partitionDiscoverer.close()_ won't be called. that caused the connection 
> leak, because KafkaConsumer is initialized but not closed. That has caused 
> outage that brought down our Kafka cluster, when a high-parallelism job got 
> into a restart/failure loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to