[
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16603591#comment-16603591
]
ASF GitHub Bot commented on KAFKA-7211:
---
lindong28 closed pull request #5492: KAFKA-7211: MM should handle
TimeoutException in commitSync
URL: https://github.com/apache/kafka/pull/5492
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d7e09e4efdb..1ddcedbd487 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -33,17 +33,18 @@ import
org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord, RecordMetadata}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.errors.{TimeoutException, WakeupException}
import org.apache.kafka.common.record.RecordBatch
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
+import scala.util.{Failure, Success, Try}
import scala.util.control.ControlThrowable
/**
* The mirror maker has the following architecture:
- * - There are N mirror maker thread shares one ZookeeperConsumerConnector and
each owns a Kafka stream.
+ * - There are N mirror maker thread, each of which is equipped with a
separate KafkaConsumer instance.
* - All the mirror maker threads share one producer.
* - Each mirror maker thread periodically flushes the producer and then
commits all offsets.
*
@@ -69,6 +70,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
private var offsetCommitIntervalMs = 0
private var abortOnSendFailure: Boolean = true
@volatile private var exitingOnSendFailure: Boolean = false
+ private var lastSuccessfulCommitTime = -1L
+ private val time = Time.SYSTEM
// If a message send failed after retries are exhausted. The offset of the
messages will also be removed from
// the unacked offset list to avoid offset commit being stuck on that
offset. In this case, the offset of that
@@ -267,24 +270,45 @@ object MirrorMaker extends Logging with KafkaMetricsGroup
{
consumers.map(consumer => new ConsumerWrapper(consumer,
customRebalanceListener, whitelist))
}
- def commitOffsets(consumerWrapper: ConsumerWrapper) {
+ def commitOffsets(consumerWrapper: ConsumerWrapper): Unit = {
if (!exitingOnSendFailure) {
- trace("Committing offsets.")
- try {
-consumerWrapper.commit()
- } catch {
-case e: WakeupException =>
- // we only call wakeup() once to close the consumer,
- // so if we catch it in commit we can safely retry
- // and re-throw to break the loop
+ var retry = 0
+ var retryNeeded = true
+ while (retryNeeded) {
+trace("Committing offsets.")
+try {
consumerWrapper.commit()
- throw e
+ lastSuccessfulCommitTime = time.milliseconds
+ retryNeeded = false
+} catch {
+ case e: WakeupException =>
+// we only call wakeup() once to close the consumer,
+// so if we catch it in commit we can safely retry
+// and re-throw to break the loop
+commitOffsets(consumerWrapper)
+throw e
+
+ case _: TimeoutException =>
+Try(consumerWrapper.consumer.listTopics) match {
+ case Success(visibleTopics) =>
+consumerWrapper.offsets.retain((tp, _) =>
visibleTopics.containsKey(tp.topic))
+ case Failure(e) =>
+warn("Failed to list all authorized topics after committing
offsets timed out: ", e)
+}
-case _: CommitFailedException =>
- warn("Failed to commit offsets because the consumer group has
rebalanced and assigned partitions to " +
-"another instance. If you see this regularly, it could indicate
that you need to either increase " +
-s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or
reduce the number of records " +
-s"handled on each iteration with
${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
+retry += 1
+warn("Failed to commit offsets because the offset commit request
processing can not be completed in time. " +
+ s"If you see this regularly,