Repository: samza Updated Branches: refs/heads/master 89dc18e96 -> 6492826e1
SAMZA-1589: Reduce failure retry duration in KafkaCheckpointManager.writeCheckpoint Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #438 from shanthoosh/SAMZA-1589 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6492826e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6492826e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6492826e Branch: refs/heads/master Commit: 6492826e15ffbe701d5a6a0cc4a4b2a06299c718 Parents: 89dc18e Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Thu Mar 8 18:00:15 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Thu Mar 8 18:00:15 2018 -0800 ---------------------------------------------------------------------- .../samza/checkpoint/kafka/KafkaCheckpointManager.scala | 9 +++++---- .../samza/checkpoint/kafka/TestKafkaCheckpointManager.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6492826e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 2dd9569..b090136 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -20,6 +20,7 @@ package org.apache.samza.checkpoint.kafka import java.util.Collections +import java.util.concurrent.TimeUnit import com.google.common.base.Preconditions import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} @@ -53,8 +54,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde, checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging { - // Retry duration is approximately 83 minutes. - var MaxRetriesOnFailure = 50 + var MaxRetryDurationMs = TimeUnit.MINUTES.toMillis(15); info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, systemName:$checkpointSystem " + s"validateCheckpoints:$validateCheckpoint") @@ -158,6 +158,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, val envelope = new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes) val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy + val startTime = System.currentTimeMillis() retryBackoff.run( loop => { systemProducer.send(taskName.getTaskName, envelope) @@ -167,8 +168,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, }, (exception, loop) => { - if (loop.sleepCount >= MaxRetriesOnFailure) { - error(s"Exhausted $MaxRetriesOnFailure retries when writing checkpoint: $checkpoint for task: $taskName.") + if ((System.currentTimeMillis() - startTime) >= MaxRetryDurationMs) { + error(s"Exhausted $MaxRetryDurationMs milliseconds when writing checkpoint: $checkpoint for task: $taskName.") throw new SamzaException(s"Exception when writing checkpoint: $checkpoint for task: $taskName.", exception) } else { warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception) http://git-wip-us.apache.org/repos/asf/samza/blob/6492826e/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 5586a1a..03b0d2c 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -115,7 +115,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val props = new org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties() val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, false, props) val checkPointManager = new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry) - checkPointManager.MaxRetriesOnFailure = 1 + checkPointManager.MaxRetryDurationMs = 1 checkPointManager.register(taskName) checkPointManager.start