[ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657657#comment-15657657 ]
ASF GitHub Bot commented on FLINK-5048: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2789#discussion_r87625136 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.ExceptionUtils; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import javax.annotation.Nonnull; +import java.io.Closeable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public final class Handover implements Closeable { + + private final Object lock = new Object(); + + private ConsumerRecords<byte[], byte[]> next; + private Throwable error; + private boolean wakeup; + + @Nonnull + public ConsumerRecords<byte[], byte[]> pollNext() throws Exception { + synchronized (lock) { + while (next == null && error == null) { + lock.wait(); + } + + ConsumerRecords<byte[], byte[]> n = next; + if (n != null) { + next = null; + lock.notifyAll(); + return n; + } + else { + ExceptionUtils.rethrowException(error, error.getMessage()); + + // this statement cannot be reached since the above method always throws an exception + // this is only here to silence the compiler and any warnings + return ConsumerRecords.empty(); + } + } + } + + public void produce(final ConsumerRecords<byte[], byte[]> element) + throws InterruptedException, WakeupException, ClosedException { + + checkNotNull(element); + + synchronized (lock) { + while (next != null && !wakeup) { + lock.wait(); + } + + wakeup = false; + + // if there is still an element, we must have been woken up + if (next != null) { + throw new WakeupException(); + } + // if there is no error, then this is open and can accept this element + else if (error == null) { + next = element; + lock.notifyAll(); + } + // an error marks this as closed for the producer + else { + throw new ClosedException(); + } + } + } + + public void reportError(Throwable t) { + checkNotNull(t); + + synchronized (lock) { + // do not override the initial exception + if (error == null) { + error = t; + } + next = null; + lock.notifyAll(); + } + } + + @Override + public void close() { + synchronized (lock) { + next = null; + wakeup = false; + + if (error == null) { + error = new ClosedException(); + } + lock.notifyAll(); + } + } + + public void wakeupProducer() { + synchronized (lock) { + wakeup = true; + lock.notifyAll(); + } + } + + @VisibleForTesting + Object getLock() { + return lock; + } + + // ------------------------------------------------------------------------ + + public static final class ClosedException extends IllegalStateException { --- End diff -- Not really sure if extending `IllegalStateException` is good here, because `ClosedException` will be rethrown to the fetcher even on a normal call to `Handover#close()`. I understand it's to allow the cancellation process be faster, but somehow I think a normal `close()` _after_ `poll()` was called doesn't add up to me as an illegal state. > Kafka Consumer (0.9/0.10) threading model leads problematic cancellation > behavior > --------------------------------------------------------------------------------- > > Key: FLINK-5048 > URL: https://issues.apache.org/jira/browse/FLINK-5048 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.1.3 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.2.0 > > > The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that > operates the KafkaConsumer. That thread is shielded from interrupts, because > the Kafka Consumer has not been handling thread interrupts well. > Since that thread is also the thread that emits records, it may block in the > network stack (backpressure) or in chained operators. The later case leads to > situations where cancellations get very slow unless that thread would be > interrupted (which it cannot be). > I propose to change the thread model as follows: > - A spawned consumer thread pull from the KafkaConsumer and pushes its > pulled batch of records into a blocking queue (size one) > - The main thread of the task will pull the record batches from the > blocking queue and emit the records. > This allows actually for some additional I/O overlay while limiting the > additional memory consumption - only two batches are ever held, one being > fetched and one being emitted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)