[ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15670743#comment-15670743 ]
ASF GitHub Bot commented on FLINK-5048: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2789#discussion_r88260220 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records. + * The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will + * deserialize and emit the records. + * + * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down. + * The Kafka consumer code was found to not always handle interrupts well, and to even + * deadlock in certain situations. + * + * <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer. + * Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection + * to the KafkaConsumer calls that change signature. + */ +public class KafkaConsumerThread extends Thread { + + /** Logger for this consumer */ + final Logger log; + + /** The handover of data and exceptions between the consumer thread and the task thread */ + private final Handover handover; + + /** The next offsets that the main thread should commit */ + private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit; + + /** The configuration for the Kafka consumer */ + private final Properties kafkaProperties; + + /** The partitions that this consumer reads from */ + private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions; + + /** We get this from the outside to publish metrics. **/ + private final MetricGroup kafkaMetricGroup; + + /** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken */ + private final KafkaConsumerCallBridge consumerCallBridge; + + /** The maximum number of milliseconds to wait for a fetch batch */ + private final long pollTimeout; + + /** Flag whether to add Kafka's metrics to the Flink metrics */ + private final boolean useMetrics; + + /** Reference to the Kafka consumer, once it is created */ + private volatile KafkaConsumer<byte[], byte[]> consumer; + + /** Flag to mark the main work loop as alive */ + private volatile boolean running; + + /** Flag tracking whether the latest commit request has completed */ + private volatile boolean commitInProgress; + + + public KafkaConsumerThread( + Logger log, + Handover handover, + Properties kafkaProperties, + KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions, + MetricGroup kafkaMetricGroup, + KafkaConsumerCallBridge consumerCallBridge, + String threadName, + long pollTimeout, + boolean useMetrics) { + + super(threadName); + setDaemon(true); + + this.log = checkNotNull(log); + this.handover = checkNotNull(handover); + this.kafkaProperties = checkNotNull(kafkaProperties); + this.subscribedPartitions = checkNotNull(subscribedPartitions); + this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup); + this.consumerCallBridge = checkNotNull(consumerCallBridge); + this.pollTimeout = pollTimeout; + this.useMetrics = useMetrics; + + this.nextOffsetsToCommit = new AtomicReference<>(); + this.running = true; + } + + // ------------------------------------------------------------------------ + + @Override + public void run() { + // early exit check + if (!running) { + return; + } + + // this is the means to talk to FlinkKafkaConsumer's main thread + final Handover handover = this.handover; + + // This method initializes the KafkaConsumer and guarantees it is torn down properly. + // This is important, because the consumer has multi-threading issues, + // including concurrent 'close()' calls. + final KafkaConsumer<byte[], byte[]> consumer; + try { + consumer = new KafkaConsumer<>(kafkaProperties); + } + catch (Throwable t) { + handover.reportError(t); + return; + } + + // from here on, the consumer is guaranteed to be closed properly + try { + // The callback invoked by Kafka once an offset commit is complete + final OffsetCommitCallback offsetCommitCallback = new CommitCallback(); + + // tell the consumer which partitions to work with + consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions)); + + // register Kafka's very own metrics in Flink's metric reporters + if (useMetrics) { + // register Kafka metrics to Flink + Map<MetricName, ? extends Metric> metrics = consumer.metrics(); + if (metrics == null) { + // MapR's Kafka implementation returns null here. + log.info("Consumer implementation does not support metrics"); + } else { + // we have Kafka metrics, register them + for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { + kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); + } + } + } + + // early exit check + if (!running) { + return; + } + + // seek the consumer to the initial offsets + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) { + if (partition.isOffsetDefined()) { + log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " + + "seeking the consumer to position {}", + partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); + + consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); + } + else { + // for partitions that do not have offsets restored from a checkpoint/savepoint, + // we need to define our internal offset state for them using the initial offsets retrieved from Kafka + // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + + long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle()); + + log.info("Partition {} has no initial offset; the consumer has position {}, " + + "so the initial offset will be set to {}", + partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1); + + // the fetched offset represents the next record to process, so we need to subtract it by 1 + partition.setOffset(fetchedOffset - 1); + } + } + + // from now on, external operations may call the consumer + this.consumer = consumer; + + // the latest bulk of records. may carry across the loop if the thread is woken up + // from blocking on the handover + ConsumerRecords<byte[], byte[]> records = null; + + // main fetch loop + while (running) { + + // check if there is something to commit + if (!commitInProgress) { + // get and reset the work-to-be committed, so we don't repeatedly commit the same + final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null); + + if (toCommit != null) { + log.debug("Sending async offset commit request to Kafka broker"); + + // also record that a commit is already in progress + // the order here matters! first set the flag, then send the commit command. + commitInProgress = true; + consumer.commitAsync(toCommit, offsetCommitCallback); + } + } + + // get the next batch of records, unless we did not manage to hand the old batch over + if (records == null) { + try { + records = consumer.poll(pollTimeout); + } + catch (WakeupException we) { + continue; + } + } + + try { + handover.produce(records); + records = null; + } + catch (Handover.WakeupException e) { + // fall through the loop + } + } + // end main fetch loop + } + catch (Throwable t) { + // let the main thread know and exit + // it may be that this exception comes because the main thread closed the handover, in + // which case the below reporting is irrelevant, but does not hurt either + handover.reportError(t); + } + finally { + try { + consumer.close(); --- End diff -- We need to call `handover.close()` in the `shutdown()` method, otherwise it will not properly wake up the thread. We could rely on someone else calling it, but that would not make the KafkaConsumerThread's logic self-contained (it would rely on implicit behavior of another class), which I would like to avoid. > Kafka Consumer (0.9/0.10) threading model leads problematic cancellation > behavior > --------------------------------------------------------------------------------- > > Key: FLINK-5048 > URL: https://issues.apache.org/jira/browse/FLINK-5048 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.1.3 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.2.0 > > > The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that > operates the KafkaConsumer. That thread is shielded from interrupts, because > the Kafka Consumer has not been handling thread interrupts well. > Since that thread is also the thread that emits records, it may block in the > network stack (backpressure) or in chained operators. The later case leads to > situations where cancellations get very slow unless that thread would be > interrupted (which it cannot be). > I propose to change the thread model as follows: > - A spawned consumer thread pull from the KafkaConsumer and pushes its > pulled batch of records into a blocking queue (size one) > - The main thread of the task will pull the record batches from the > blocking queue and emit the records. > This allows actually for some additional I/O overlay while limiting the > additional memory consumption - only two batches are ever held, one being > fetched and one being emitted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)