[jira] [Commented] (KAFKA-7211) MM should handle timeouts in commitSync

2018-09-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16603591#comment-16603591
 ] 

ASF GitHub Bot commented on KAFKA-7211:
---

lindong28 closed pull request #5492: KAFKA-7211: MM should handle 
TimeoutException in commitSync
URL: https://github.com/apache/kafka/pull/5492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d7e09e4efdb..1ddcedbd487 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -33,17 +33,18 @@ import 
org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.errors.{TimeoutException, WakeupException}
 import org.apache.kafka.common.record.RecordBatch
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
+import scala.util.{Failure, Success, Try}
 import scala.util.control.ControlThrowable
 
 /**
  * The mirror maker has the following architecture:
- * - There are N mirror maker thread shares one ZookeeperConsumerConnector and 
each owns a Kafka stream.
+ * - There are N mirror maker thread, each of which is equipped with a 
separate KafkaConsumer instance.
  * - All the mirror maker threads share one producer.
  * - Each mirror maker thread periodically flushes the producer and then 
commits all offsets.
  *
@@ -69,6 +70,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   private var offsetCommitIntervalMs = 0
   private var abortOnSendFailure: Boolean = true
   @volatile private var exitingOnSendFailure: Boolean = false
+  private var lastSuccessfulCommitTime = -1L
+  private val time = Time.SYSTEM
 
   // If a message send failed after retries are exhausted. The offset of the 
messages will also be removed from
   // the unacked offset list to avoid offset commit being stuck on that 
offset. In this case, the offset of that
@@ -267,24 +270,45 @@ object MirrorMaker extends Logging with KafkaMetricsGroup 
{
 consumers.map(consumer => new ConsumerWrapper(consumer, 
customRebalanceListener, whitelist))
   }
 
-  def commitOffsets(consumerWrapper: ConsumerWrapper) {
+  def commitOffsets(consumerWrapper: ConsumerWrapper): Unit = {
 if (!exitingOnSendFailure) {
-  trace("Committing offsets.")
-  try {
-consumerWrapper.commit()
-  } catch {
-case e: WakeupException =>
-  // we only call wakeup() once to close the consumer,
-  // so if we catch it in commit we can safely retry
-  // and re-throw to break the loop
+  var retry = 0
+  var retryNeeded = true
+  while (retryNeeded) {
+trace("Committing offsets.")
+try {
   consumerWrapper.commit()
-  throw e
+  lastSuccessfulCommitTime = time.milliseconds
+  retryNeeded = false
+} catch {
+  case e: WakeupException =>
+// we only call wakeup() once to close the consumer,
+// so if we catch it in commit we can safely retry
+// and re-throw to break the loop
+commitOffsets(consumerWrapper)
+throw e
+
+  case _: TimeoutException =>
+Try(consumerWrapper.consumer.listTopics) match {
+  case Success(visibleTopics) =>
+consumerWrapper.offsets.retain((tp, _) => 
visibleTopics.containsKey(tp.topic))
+  case Failure(e) =>
+warn("Failed to list all authorized topics after committing 
offsets timed out: ", e)
+}
 
-case _: CommitFailedException =>
-  warn("Failed to commit offsets because the consumer group has 
rebalanced and assigned partitions to " +
-"another instance. If you see this regularly, it could indicate 
that you need to either increase " +
-s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or 
reduce the number of records " +
-s"handled on each iteration with 
${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
+retry += 1
+warn("Failed to commit offsets because the offset commit request 
processing can not be completed in time. " +
+  s"If you see this regularly, 

[jira] [Commented] (KAFKA-7211) MM should handle timeouts in commitSync

2018-08-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577844#comment-16577844
 ] 

ASF GitHub Bot commented on KAFKA-7211:
---

huxihx opened a new pull request #5492: KAFKA-7211: MM should handle 
TimeoutException in commitSync
URL: https://github.com/apache/kafka/pull/5492
 
 
   With KIP-266 introduced, MirrorMaker should handle TimeoutException thrown 
in commitSync(). Besides, MM should only commit offsets for existsing topics.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM should handle timeouts in commitSync
> ---
>
> Key: KAFKA-7211
> URL: https://issues.apache.org/jira/browse/KAFKA-7211
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
>
> Now that we have KIP-266, the user can override `default.api.timeout.ms` for 
> the consumer so that commitSync does not block indefinitely. MM needs to be 
> updated to handle TimeoutException. We may also need some logic to handle 
> deleted topics. If MM attempts to commit an offset for a deleted topic, the 
> call will timeout and we should probably check if the topic exists and remove 
> the offset if it doesn't.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)