[ 
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603591#comment-16603591
 ] 

ASF GitHub Bot commented on KAFKA-7211:
---------------------------------------

lindong28 closed pull request #5492: KAFKA-7211: MM should handle 
TimeoutException in commitSync
URL: https://github.com/apache/kafka/pull/5492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d7e09e4efdb..1ddcedbd487 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -33,17 +33,18 @@ import 
org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.errors.{TimeoutException, WakeupException}
 import org.apache.kafka.common.record.RecordBatch
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
+import scala.util.{Failure, Success, Try}
 import scala.util.control.ControlThrowable
 
 /**
  * The mirror maker has the following architecture:
- * - There are N mirror maker thread shares one ZookeeperConsumerConnector and 
each owns a Kafka stream.
+ * - There are N mirror maker thread, each of which is equipped with a 
separate KafkaConsumer instance.
  * - All the mirror maker threads share one producer.
  * - Each mirror maker thread periodically flushes the producer and then 
commits all offsets.
  *
@@ -69,6 +70,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   private var offsetCommitIntervalMs = 0
   private var abortOnSendFailure: Boolean = true
   @volatile private var exitingOnSendFailure: Boolean = false
+  private var lastSuccessfulCommitTime = -1L
+  private val time = Time.SYSTEM
 
   // If a message send failed after retries are exhausted. The offset of the 
messages will also be removed from
   // the unacked offset list to avoid offset commit being stuck on that 
offset. In this case, the offset of that
@@ -267,24 +270,45 @@ object MirrorMaker extends Logging with KafkaMetricsGroup 
{
     consumers.map(consumer => new ConsumerWrapper(consumer, 
customRebalanceListener, whitelist))
   }
 
-  def commitOffsets(consumerWrapper: ConsumerWrapper) {
+  def commitOffsets(consumerWrapper: ConsumerWrapper): Unit = {
     if (!exitingOnSendFailure) {
-      trace("Committing offsets.")
-      try {
-        consumerWrapper.commit()
-      } catch {
-        case e: WakeupException =>
-          // we only call wakeup() once to close the consumer,
-          // so if we catch it in commit we can safely retry
-          // and re-throw to break the loop
+      var retry = 0
+      var retryNeeded = true
+      while (retryNeeded) {
+        trace("Committing offsets.")
+        try {
           consumerWrapper.commit()
-          throw e
+          lastSuccessfulCommitTime = time.milliseconds
+          retryNeeded = false
+        } catch {
+          case e: WakeupException =>
+            // we only call wakeup() once to close the consumer,
+            // so if we catch it in commit we can safely retry
+            // and re-throw to break the loop
+            commitOffsets(consumerWrapper)
+            throw e
+
+          case _: TimeoutException =>
+            Try(consumerWrapper.consumer.listTopics) match {
+              case Success(visibleTopics) =>
+                consumerWrapper.offsets.retain((tp, _) => 
visibleTopics.containsKey(tp.topic))
+              case Failure(e) =>
+                warn("Failed to list all authorized topics after committing 
offsets timed out: ", e)
+            }
 
-        case _: CommitFailedException =>
-          warn("Failed to commit offsets because the consumer group has 
rebalanced and assigned partitions to " +
-            "another instance. If you see this regularly, it could indicate 
that you need to either increase " +
-            s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or 
reduce the number of records " +
-            s"handled on each iteration with 
${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
+            retry += 1
+            warn("Failed to commit offsets because the offset commit request 
processing can not be completed in time. " +
+              s"If you see this regularly, it could indicate that you need to 
increase the consumer's ${ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG} " +
+              s"Last successful offset commit 
timestamp=$lastSuccessfulCommitTime, retry count=$retry")
+            Thread.sleep(100)
+
+          case _: CommitFailedException =>
+            retryNeeded = false
+            warn("Failed to commit offsets because the consumer group has 
rebalanced and assigned partitions to " +
+              "another instance. If you see this regularly, it could indicate 
that you need to either increase " +
+              s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or 
reduce the number of records " +
+              s"handled on each iteration with 
${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
+        }
       }
     } else {
       info("Exiting on send failure, skip committing offsets.")
@@ -422,14 +446,15 @@ object MirrorMaker extends Logging with KafkaMetricsGroup 
{
   }
 
   // Visible for testing
-  private[tools] class ConsumerWrapper(consumer: Consumer[Array[Byte], 
Array[Byte]],
+  private[tools] class ConsumerWrapper(private[tools] val consumer: 
Consumer[Array[Byte], Array[Byte]],
                                        customRebalanceListener: 
Option[ConsumerRebalanceListener],
                                        whitelistOpt: Option[String]) {
     val regex = whitelistOpt.getOrElse(throw new IllegalArgumentException("New 
consumer only supports whitelist."))
     var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], 
Array[Byte]]] = null
 
     // We manually maintain the consumed offsets for historical reasons and it 
could be simplified
-    private val offsets = new HashMap[TopicPartition, Long]()
+    // Visible for testing
+    private[tools] val offsets = new HashMap[TopicPartition, Long]()
 
     def init() {
       debug("Initiating consumer")
@@ -473,7 +498,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
 
     def commit() {
-      consumer.commitSync(offsets.map { case (tp, offset) =>  (tp, new 
OffsetAndMetadata(offset, ""))}.asJava)
+      consumer.commitSync(offsets.map { case (tp, offset) => (tp, new 
OffsetAndMetadata(offset)) }.asJava)
       offsets.clear()
     }
   }
diff --git 
a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index 0a178195bef..7212b3b351e 100644
--- 
a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -24,15 +24,45 @@ import kafka.tools.MirrorMaker.{ConsumerWrapper, 
MirrorMakerProducer}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
 import org.junit.Test
+import org.junit.Assert._
 
 class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
 
   override def generateConfigs: Seq[KafkaConfig] =
     TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, 
new Properties()))
 
+  @Test(expected = classOf[TimeoutException])
+  def testCommitOffsetsThrowTimeoutException(): Unit = {
+    val consumerProps = new Properties
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1")
+    val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, 
new ByteArrayDeserializer)
+    val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, whitelistOpt 
= Some("any"))
+    mirrorMakerConsumer.offsets.put(new TopicPartition("test", 0), 0L)
+    mirrorMakerConsumer.commit()
+  }
+
+  @Test
+  def testCommitOffsetsRemoveNonExistentTopics(): Unit = {
+    val consumerProps = new Properties
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000")
+    val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, 
new ByteArrayDeserializer)
+    val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, whitelistOpt 
= Some("any"))
+    mirrorMakerConsumer.offsets.put(new TopicPartition("nonexistent-topic1", 
0), 0L)
+    mirrorMakerConsumer.offsets.put(new TopicPartition("nonexistent-topic2", 
0), 0L)
+    MirrorMaker.commitOffsets(mirrorMakerConsumer)
+    assertTrue("Offsets for non-existent topics should be removed", 
mirrorMakerConsumer.offsets.isEmpty)
+  }
+
   @Test
   def testCommaSeparatedRegex(): Unit = {
     val topic = "new-topic"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM should handle timeouts in commitSync
> ---------------------------------------
>
>                 Key: KAFKA-7211
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7211
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: huxihx
>            Priority: Major
>             Fix For: 2.1.0
>
>
> Now that we have KIP-266, the user can override `default.api.timeout.ms` for 
> the consumer so that commitSync does not block indefinitely. MM needs to be 
> updated to handle TimeoutException. We may also need some logic to handle 
> deleted topics. If MM attempts to commit an offset for a deleted topic, the 
> call will timeout and we should probably check if the topic exists and remove 
> the offset if it doesn't.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to