[ 
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657655#comment-15657655
 ] 

ASF GitHub Bot commented on FLINK-5048:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2789#discussion_r87626228
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
    @@ -66,36 +57,15 @@
        /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
        private final KeyedDeserializationSchema<T> deserializer;
     
    -   /** The configuration for the Kafka consumer */
    -   private final Properties kafkaProperties;
    +   /** The handover of data and exceptions between the consumer thread and 
the task thread */
    +   private final Handover handover;
     
    -   /** The maximum number of milliseconds to wait for a fetch batch */
    -   private final long pollTimeout;
    -
    -   /** The next offsets that the main thread should commit */
    -   private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> 
nextOffsetsToCommit;
    -   
    -   /** The callback invoked by Kafka once an offset commit is complete */
    -   private final OffsetCommitCallback offsetCommitCallback;
    -
    -   /** Reference to the Kafka consumer, once it is created */
    -   private volatile KafkaConsumer<byte[], byte[]> consumer;
    -   
    -   /** Reference to the proxy, forwarding exceptions from the fetch thread 
to the main thread */
    -   private volatile ExceptionProxy errorHandler;
    +   /** The thread that runs the proper KafkaConsumer and hand the record 
batches to this fetcher */
    --- End diff --
    
    nit: 'proper' confused me a  bit at first. Perhaps 'actual'?


> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation 
> behavior
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-5048
>                 URL: https://issues.apache.org/jira/browse/FLINK-5048
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that 
> operates the KafkaConsumer. That thread is shielded from interrupts, because 
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the 
> network stack (backpressure) or in chained operators. The later case leads to 
> situations where cancellations get very slow unless that thread would be 
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
>   - A spawned consumer thread pull from the KafkaConsumer and pushes its 
> pulled batch of records into a blocking queue (size one)
>   - The main thread of the task will pull the record batches from the 
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the 
> additional memory consumption - only two batches are ever held, one being 
> fetched and one being emitted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to