Repository: incubator-samza
Updated Branches:
  refs/heads/master 8cd3ff12d -> c96ecc68b


SAMZA-44; change state topic phrasing in kafka checkpoint manager to be 
checkpoint topic


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/c96ecc68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/c96ecc68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/c96ecc68

Branch: refs/heads/master
Commit: c96ecc68bf67d919e8d42265acc70fd4b5ea0b51
Parents: 8cd3ff1
Author: Zhijie Shen <[email protected]>
Authored: Mon Mar 24 10:26:17 2014 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Mon Mar 24 10:26:17 2014 -0700

----------------------------------------------------------------------
 .../kafka/KafkaCheckpointManager.scala          | 66 ++++++++++----------
 .../kafka/KafkaCheckpointManagerFactory.scala   |  4 +-
 .../checkpoint/TestKafkaCheckpointManager.scala |  4 +-
 3 files changed, 37 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c96ecc68/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 64e882b..e98b50a 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -56,7 +56,7 @@ import org.apache.samza.util.ExponentialSleepStrategy
  */
 class KafkaCheckpointManager(
   clientId: String,
-  stateTopic: String,
+  checkpointTopic: String,
   systemName: String,
   totalPartitions: Int,
   replicationFactor: Int,
@@ -72,7 +72,7 @@ class KafkaCheckpointManager(
   var partitions = Set[Partition]()
   var producer: Producer[Partition, Array[Byte]] = null
 
-  info("Creating KafkaCheckpointManager with: clientId=%s, stateTopic=%s, 
systemName=%s" format (clientId, stateTopic, systemName))
+  info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, 
systemName=%s" format (clientId, checkpointTopic, systemName))
 
   def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) {
     retryBackoff.run(
@@ -80,7 +80,7 @@ class KafkaCheckpointManager(
         if (producer == null) {
           producer = connectProducer()
         }
-        producer.send(new KeyedMessage(stateTopic, null, partition, 
serde.toBytes(checkpoint)))
+        producer.send(new KeyedMessage(checkpointTopic, null, partition, 
serde.toBytes(checkpoint)))
         loop.done
       },
 
@@ -100,10 +100,10 @@ class KafkaCheckpointManager(
 
     val checkpoint = retryBackoff.run(
       loop => {
-        // Assume state topic exists with correct partitions, since it should 
be verified on start.
-        // Fetch the metadata for this state topic/partition pair.
-        val metadataMap = TopicMetadataCache.getTopicMetadata(Set(stateTopic), 
systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
-        val metadata = metadataMap(stateTopic)
+        // Assume checkpoint topic exists with correct partitions, since it 
should be verified on start.
+        // Fetch the metadata for this checkpoint topic/partition pair.
+        val metadataMap = 
TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: 
Set[String]) => metadataStore.getTopicInfo(topics))
+        val metadata = metadataMap(checkpointTopic)
         val partitionMetadata = metadata.partitionsMetadata
           .filter(_.partitionId == partition.getPartitionId)
           .headOption
@@ -111,9 +111,9 @@ class KafkaCheckpointManager(
         val partitionId = partitionMetadata.partitionId
         val leader = partitionMetadata
           .leader
-          .getOrElse(throw new SamzaException("No leader available for topic 
%s" format stateTopic))
+          .getOrElse(throw new SamzaException("No leader available for topic 
%s" format checkpointTopic))
 
-        info("Connecting to leader %s:%d for topic %s and partition %s to 
fetch last checkpoint message." format (leader.host, leader.port, stateTopic, 
partitionId))
+        info("Connecting to leader %s:%d for topic %s and partition %s to 
fetch last checkpoint message." format (leader.host, leader.port, 
checkpointTopic, partitionId))
 
         val consumer = new SimpleConsumer(
           leader.host,
@@ -122,11 +122,11 @@ class KafkaCheckpointManager(
           bufferSize,
           clientId)
         try {
-          val topicAndPartition = new TopicAndPartition(stateTopic, 
partitionId)
+          val topicAndPartition = new TopicAndPartition(checkpointTopic, 
partitionId)
           val offsetResponse = consumer.getOffsetsBefore(new 
OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
             .partitionErrorAndOffsets
             .get(topicAndPartition)
-            .getOrElse(throw new KafkaCheckpointException("Unable to find 
offset information for %s:%d" format (stateTopic, partitionId)))
+            .getOrElse(throw new KafkaCheckpointException("Unable to find 
offset information for %s:%d" format (checkpointTopic, partitionId)))
 
           // Fail or retry if there was an an issue with the offset request.
           ErrorMapping.maybeThrowException(offsetResponse.error)
@@ -134,40 +134,40 @@ class KafkaCheckpointManager(
           val offset = offsetResponse
             .offsets
             .headOption
-            .getOrElse(throw new KafkaCheckpointException("Got response, but 
no offsets defined for %s:%d" format (stateTopic, partitionId)))
+            .getOrElse(throw new KafkaCheckpointException("Got response, but 
no offsets defined for %s:%d" format (checkpointTopic, partitionId)))
 
-          info("Got offset %s for topic %s and partition %s. Attempting to 
fetch message." format (offset, stateTopic, partitionId))
+          info("Got offset %s for topic %s and partition %s. Attempting to 
fetch message." format (offset, checkpointTopic, partitionId))
 
           if (offset <= 0) {
-            info("Got offset 0 (no messages in state topic) for topic %s and 
partition %s, so returning null. If you expected the state topic to have 
messages, you're probably going to lose data." format (stateTopic, partition))
+            info("Got offset 0 (no messages in checkpoint topic) for topic %s 
and partition %s, so returning null. If you expected the checkpoint topic to 
have messages, you're probably going to lose data." format (checkpointTopic, 
partition))
             return null
           }
 
           val request = new FetchRequestBuilder()
             // Kafka returns 1 greater than the offset of the last message in
             // the topic, so subtract one to fetch the last message.
-            .addFetch(stateTopic, partitionId, offset - 1, fetchSize)
+            .addFetch(checkpointTopic, partitionId, offset - 1, fetchSize)
             .maxWait(500)
             .minBytes(1)
             .clientId(clientId)
             .build
           val messageSet = consumer.fetch(request)
           if (messageSet.hasError) {
-            warn("Got error code from broker for %s: %s" format (stateTopic, 
messageSet.errorCode(stateTopic, partitionId)))
-            val errorCode = messageSet.errorCode(stateTopic, partitionId)
+            warn("Got error code from broker for %s: %s" format 
(checkpointTopic, messageSet.errorCode(checkpointTopic, partitionId)))
+            val errorCode = messageSet.errorCode(checkpointTopic, partitionId)
             if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
-              warn("Got an offset out of range exception while getting last 
checkpoint for topic %s and partition %s, so returning a null offset to the 
KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." 
format (stateTopic, partitionId))
+              warn("Got an offset out of range exception while getting last 
checkpoint for topic %s and partition %s, so returning a null offset to the 
KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." 
format (checkpointTopic, partitionId))
               return null
             }
             ErrorMapping.maybeThrowException(errorCode)
           }
-          val messages = messageSet.messageSet(stateTopic, partitionId).toList
+          val messages = messageSet.messageSet(checkpointTopic, 
partitionId).toList
 
           if (messages.length != 1) {
             throw new KafkaCheckpointException("Something really unexpected 
happened. Got %s "
-              + "messages back when fetching from state checkpoint topic %s 
and partition %s. "
+              + "messages back when fetching from checkpoint topic %s and 
partition %s. "
               + "Expected one message. It would be unsafe to go on without the 
latest checkpoint, "
-              + "so failing." format (messages.length, stateTopic, partition))
+              + "so failing." format (messages.length, checkpointTopic, 
partition))
           }
 
           // Some back bending to go from message to checkpoint.
@@ -183,7 +183,7 @@ class KafkaCheckpointManager(
         exception match {
           case e: KafkaCheckpointException => throw e
           case e: Exception =>
-            warn("While trying to read last checkpoint for topic %s and 
partition %s: %s. Retrying." format (stateTopic, partition, e))
+            warn("While trying to read last checkpoint for topic %s and 
partition %s: %s. Retrying." format (checkpointTopic, partition, e))
             debug(e)
         }
       }
@@ -212,31 +212,31 @@ class KafkaCheckpointManager(
   }
 
   private def createTopic {
-    info("Attempting to create state topic %s with %s partitions." format 
(stateTopic, totalPartitions))
+    info("Attempting to create checkpoint topic %s with %s partitions." format 
(checkpointTopic, totalPartitions))
     retryBackoff.run(
       loop => {
         val zkClient = connectZk()
         try {
           AdminUtils.createTopic(
             zkClient,
-            stateTopic,
+            checkpointTopic,
             totalPartitions,
             replicationFactor)
         } finally {
           zkClient.close
         }
 
-        info("Created state topic %s." format stateTopic)
+        info("Created checkpoint topic %s." format checkpointTopic)
         loop.done
       },
 
       (exception, loop) => {
         exception match {
           case e: TopicExistsException =>
-            info("State topic %s already exists." format stateTopic)
+            info("Checkpoint topic %s already exists." format checkpointTopic)
             loop.done
           case e: Exception =>
-            warn("Failed to create topic %s: %s. Retrying." format 
(stateTopic, e))
+            warn("Failed to create topic %s: %s. Retrying." format 
(checkpointTopic, e))
             debug(e)
         }
       }
@@ -244,19 +244,19 @@ class KafkaCheckpointManager(
   }
 
   private def validateTopic {
-    info("Validating state topic %s." format stateTopic)
+    info("Validating checkpoint topic %s." format checkpointTopic)
     retryBackoff.run(
       loop => {
-        val topicMetadataMap = 
TopicMetadataCache.getTopicMetadata(Set(stateTopic), systemName, 
metadataStore.getTopicInfo)
-        val topicMetadata = topicMetadataMap(stateTopic)
+        val topicMetadataMap = 
TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, 
metadataStore.getTopicInfo)
+        val topicMetadata = topicMetadataMap(checkpointTopic)
         ErrorMapping.maybeThrowException(topicMetadata.errorCode)
 
         val partitionCount = topicMetadata.partitionsMetadata.length
         if (partitionCount != totalPartitions) {
-          throw new KafkaCheckpointException("State topic validation failed 
for topic %s because partition count %s did not match expected partition count 
%s." format (stateTopic, topicMetadata.partitionsMetadata.length, 
totalPartitions))
+          throw new KafkaCheckpointException("Checkpoint topic validation 
failed for topic %s because partition count %s did not match expected partition 
count %s." format (checkpointTopic, topicMetadata.partitionsMetadata.length, 
totalPartitions))
         }
 
-        info("Successfully validated state topic %s." format stateTopic)
+        info("Successfully validated checkpoint topic %s." format 
checkpointTopic)
         loop.done
       },
 
@@ -264,7 +264,7 @@ class KafkaCheckpointManager(
         exception match {
           case e: KafkaCheckpointException => throw e
           case e: Exception =>
-            warn("While trying to validate topic %s: %s. Retrying." format 
(stateTopic, e))
+            warn("While trying to validate topic %s: %s. Retrying." format 
(checkpointTopic, e))
             debug(e)
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c96ecc68/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 2197b01..d45c1e4 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -72,14 +72,14 @@ class KafkaCheckpointManagerFactory extends 
CheckpointManagerFactory with Loggin
     val brokersListString = Option(producerConfig.brokerList)
       .getOrElse(throw new SamzaException("No broker list defined in config 
for %s." format systemName))
     val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, 
clientId)
-    val stateTopic = getTopic(jobName, jobId)
+    val checkpointTopic = getTopic(jobName, jobId)
     
     // This is a reasonably expensive operation and the TaskInstance already 
knows the answer. Should use that info.
     val totalPartitions = 
Util.getInputStreamPartitions(config).map(_.getPartition).toSet.size
 
     new KafkaCheckpointManager(
       clientId,
-      stateTopic,
+      checkpointTopic,
       systemName,
       totalPartitions,
       replicationFactor,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c96ecc68/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
index 3f5a609..f1a8f8a 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
@@ -104,7 +104,7 @@ class TestKafkaCheckpointManager {
   import TestKafkaCheckpointManager._
 
   @Test
-  def 
testCheckpointShouldBeNullIfStateTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite
 {
+  def 
testCheckpointShouldBeNullIfcheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite
 {
     val kcm = getKafkaCheckpointManager
     kcm.register(partition)
     kcm.start
@@ -131,7 +131,7 @@ class TestKafkaCheckpointManager {
 
   private def getKafkaCheckpointManager = new KafkaCheckpointManager(
     clientId = "some-client-id",
-    stateTopic = "state-topic",
+    checkpointTopic = "checkpoint-topic",
     systemName = "kafka",
     totalPartitions = 1,
     replicationFactor = 3,

Reply via email to