[ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15670717#comment-15670717 ]
ASF GitHub Bot commented on FLINK-5048: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2789#discussion_r88258273 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java --- @@ -172,8 +173,14 @@ public void cancel() { // ----------- add consumer dataflow ---------- + // the consumer should only poll very small chunks + Properties consumerProps = new Properties(); + consumerProps.putAll(standardProps); + consumerProps.putAll(secureProps); + consumerProps.setProperty("fetch.message.max.bytes", "100"); --- End diff -- leftover from getting the "short retention" tests to run with the modified source. will undo. > Kafka Consumer (0.9/0.10) threading model leads problematic cancellation > behavior > --------------------------------------------------------------------------------- > > Key: FLINK-5048 > URL: https://issues.apache.org/jira/browse/FLINK-5048 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.1.3 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.2.0 > > > The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that > operates the KafkaConsumer. That thread is shielded from interrupts, because > the Kafka Consumer has not been handling thread interrupts well. > Since that thread is also the thread that emits records, it may block in the > network stack (backpressure) or in chained operators. The later case leads to > situations where cancellations get very slow unless that thread would be > interrupted (which it cannot be). > I propose to change the thread model as follows: > - A spawned consumer thread pull from the KafkaConsumer and pushes its > pulled batch of records into a blocking queue (size one) > - The main thread of the task will pull the record batches from the > blocking queue and emit the records. > This allows actually for some additional I/O overlay while limiting the > additional memory consumption - only two batches are ever held, one being > fetched and one being emitted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)