[ 
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15670757#comment-15670757
 ] 

ASF GitHub Bot commented on FLINK-5048:
---------------------------------------

Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2789
  
    I would actually like to not change how/when `handover.close()` is called. 
It is called more often that necessary (probably), but since it is an 
idempotent operation, it does not matter.
    
    The code is designed to lead to the quickest wakeup/termination possible in 
all cases:
      - cancellation
      - end of stream
      - error in the fetcher
      - error in the consumer
    
    Also note that errors/close do not overwrite each other, which makes it 
fine if the other is called afterwards in addition.
    
    Also, both the fetcher and the KafkaConsumerThread are written to 
encapsulate all necessary logic self contained. That means they do not rely on 
each other to call `handover.close()` in any situation - that makes the design 
more robust.


> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation 
> behavior
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-5048
>                 URL: https://issues.apache.org/jira/browse/FLINK-5048
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that 
> operates the KafkaConsumer. That thread is shielded from interrupts, because 
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the 
> network stack (backpressure) or in chained operators. The later case leads to 
> situations where cancellations get very slow unless that thread would be 
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
>   - A spawned consumer thread pull from the KafkaConsumer and pushes its 
> pulled batch of records into a blocking queue (size one)
>   - The main thread of the task will pull the record batches from the 
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the 
> additional memory consumption - only two batches are ever held, one being 
> fetched and one being emitted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to