Repository: samza
Updated Branches:
  refs/heads/master 1e97fb2e4 -> 9e8107ea7


SAMZA-864: allow job to continue when checkpoint partition validation fails


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9e8107ea
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9e8107ea
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9e8107ea

Branch: refs/heads/master
Commit: 9e8107ea7b710b4572fe18c5fd37bc0d0be47ad1
Parents: 1e97fb2
Author: Boris Shkolnik <bor...@apache.org>
Authored: Thu Feb 18 15:42:26 2016 -0800
Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Committed: Fri Feb 19 10:06:31 2016 -0800

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  7 +++
 .../org/apache/samza/config/JobConfig.scala     |  8 ++++
 .../kafka/KafkaCheckpointManager.scala          |  3 +-
 .../kafka/KafkaCheckpointManagerFactory.scala   |  1 +
 .../migration/KafkaCheckpointMigration.scala    |  3 +-
 .../scala/org/apache/samza/util/KafkaUtil.scala | 16 +++++--
 .../kafka/TestKafkaCheckpointManager.scala      | 47 +++++++++++++++++---
 7 files changed, 73 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 6705530..175437c 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -231,6 +231,13 @@
                         </dl>
                     </td>
                 </tr>
+                <tr>
+                    <td class="property" 
id="job-checkpoint-validation-enabled">job.checkpoint.<br>validation.enabled</td>
+                    <td class="default">true</td>
+                    <td class="description">
+                        This setting controls if the job should fail(true) or 
just warn(false) in case the validation of checkpoint partition number fails. 
<br/> <b>CAUTION</b>: this configuration needs to be used w/ care. It should 
only be used as a work-around after the checkpoint has been auto-created with 
wrong number of partitions by mistake.
+                    </td>
+                </tr>
 
                 <tr>
                     <th colspan="3" class="section" id="task"><a 
href="../api/overview.html">Task configuration</a></th>

http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 1a8adae..4f3e9a2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -43,6 +43,12 @@ object JobConfig {
   val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
   val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
   val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
+  // number of partitions in the checkpoint stream should be 1. But sometimes,
+  // if a stream was created(automatically) with the wrong number of 
partitions(default number of partitions
+  // for new streams), there is no easy fix for the user (topic deletion or 
reducing of number of partitions
+  // is not yet supported, and auto-creation of the topics cannot be always 
easily tuned off).
+  // So we add a setting that allows for the job to continue even though 
number of partitions is not 1.
+  val JOB_FAIL_CHECKPOINT_VALIDATION = "job.checkpoint.validation.enabled"
 
   implicit def Config2Job(config: Config) = new JobConfig(config)
 }
@@ -72,6 +78,8 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
 
   def getJobId = getOption(JobConfig.JOB_ID)
 
+  def failOnCheckpointValidation = { 
getBoolean(JobConfig.JOB_FAIL_CHECKPOINT_VALIDATION, true) }
+
   def getConfigRewriters = getOption(JobConfig.CONFIG_REWRITERS)
 
   def getConfigRewriterClass(name: String) = 
getOption(JobConfig.CONFIG_REWRITER_CLASS format name)

http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 787de1f..ea10cae 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -58,6 +58,7 @@ class KafkaCheckpointManager(
                               connectProducer: () => Producer[Array[Byte], 
Array[Byte]],
                               val connectZk: () => ZkClient,
                               systemStreamPartitionGrouperFactoryString: 
String,
+                              failOnCheckpointValidation: Boolean,
                               val retryBackoff: ExponentialSleepStrategy = new 
ExponentialSleepStrategy,
                               serde: CheckpointSerde = new CheckpointSerde,
                               checkpointTopicProperties: Properties = new 
Properties) extends CheckpointManager with Logging {
@@ -275,7 +276,7 @@ class KafkaCheckpointManager(
 
   def start {
     kafkaUtil.createTopic(checkpointTopic, 1, replicationFactor, 
checkpointTopicProperties)
-    kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, 
metadataStore, 1)
+    kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, 
metadataStore, 1, failOnCheckpointValidation)
   }
 
   def register(taskName: TaskName) {

http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 7db8940..4e97376 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -96,6 +96,7 @@ class KafkaCheckpointManagerFactory extends 
CheckpointManagerFactory with Loggin
       connectProducer,
       connectZk,
       config.getSystemStreamPartitionGrouperFactory,      // To find out the 
SSPGrouperFactory class so it can be included/verified in the key
+      config.failOnCheckpointValidation,
       checkpointTopicProperties = getCheckpointTopicProperties(config))
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
index c6b1fe4..5e8cc65 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
@@ -104,7 +104,8 @@ class KafkaCheckpointMigration extends MigrationPlan with 
Logging {
         checkpointTopicName,
         getCheckpointSystemName(config),
         getTopicMetadataStore(config),
-        1)
+        1,
+        config.failOnCheckpointValidation)
 
       if (migrationVerification(coordinatorSystemConsumer)) {
         info("Migration %s was already performed, doing nothing" format 
migrationKey)

http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala 
b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index f4311d1..a25ba62 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -137,11 +137,13 @@ class KafkaUtil(val retryBackoff: 
ExponentialSleepStrategy = new ExponentialSlee
    * @param systemName  Kafka system to use
    * @param metadataStore Topic Metadata store
    * @param expectedPartitionCount  Expected number of partitions
+   * @param failOnValidation If true - fail the job if the validation fails
    */
   def validateTopicPartitionCount(topicName: String,
                                   systemName: String,
                                   metadataStore: TopicMetadataStore,
-                                  expectedPartitionCount: Int) {
+                                  expectedPartitionCount: Int,
+                                  failOnValidation: Boolean = true) {
     info("Validating topic %s. Expecting partition count: %d" format 
(topicName, expectedPartitionCount))
     retryBackoff.run(
       loop => {
@@ -150,9 +152,15 @@ class KafkaUtil(val retryBackoff: ExponentialSleepStrategy 
= new ExponentialSlee
         KafkaUtil.maybeThrowException(topicMetadata.errorCode)
 
         val partitionCount = topicMetadata.partitionsMetadata.length
-        if (partitionCount != expectedPartitionCount) {
-          throw new KafkaUtilException("Validation failed for topic %s because 
partition count %s did not " +
-            "match expected partition count of %d." format(topicName, 
partitionCount, expectedPartitionCount))
+        if (partitionCount != expectedPartitionCount)
+        {
+          val msg = "Validation failed for topic %s because partition count %s 
did not " +
+                  "match expected partition count of %d." format(topicName, 
partitionCount, expectedPartitionCount)
+          if (failOnValidation) {
+            throw new KafkaUtilException(msg)
+          } else {
+            warn(msg + " Ignoring the failure.")
+          }
         }
 
         info("Successfully validated topic %s." format topicName)

http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index af4051b..e6815da 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -28,7 +28,7 @@ import kafka.zk.EmbeddedZookeeper
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.clients.producer.{KafkaProducer, Producer, 
ProducerConfig, ProducerRecord}
 import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.config.{KafkaProducerConfig, MapConfig}
+import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.serializers.CheckpointSerde
@@ -81,6 +81,7 @@ class TestKafkaCheckpointManager {
   var server2: KafkaServer = null
   var server3: KafkaServer = null
   var metadataStore: TopicMetadataStore = null
+  var failOnTopicValidation = true
 
   val systemStreamPartitionGrouperFactoryString = 
classOf[GroupByPartitionFactory].getCanonicalName
 
@@ -125,13 +126,13 @@ class TestKafkaCheckpointManager {
   }
 
 
-  private def createCheckpointTopic(cpTopic: String = checkpointTopic) = {
+  private def createCheckpointTopic(cpTopic: String = checkpointTopic, 
partNum: Int = 1) = {
     val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
     try {
       AdminUtils.createTopic(
         zkClient,
-        checkpointTopic,
-        1,
+        cpTopic,
+        partNum,
         1,
         checkpointTopicConfig)
     } catch {
@@ -202,9 +203,38 @@ class TestKafkaCheckpointManager {
     }
   }
 
-  private def getKafkaCheckpointManager = new KafkaCheckpointManager(
+  @Test
+  def testFailOnTopicValidation {
+    // first case - default case, we should fail on validation
+    failOnTopicValidation = true
+    val checkpointTopic8 = checkpointTopic + "8";
+    val kcm = getKafkaCheckpointManagerWithParam(checkpointTopic8)
+    val taskName = new TaskName(partition.toString)
+    kcm.register(taskName)
+    createCheckpointTopic(checkpointTopic8, 8) // create topic with the wrong 
number of partitions
+    try {
+      kcm.start
+      fail("Expected a KafkaUtilException for invalid number of partitions in 
the topic.")
+    }catch {
+      case e: KafkaUtilException => None
+    }
+    kcm.stop
+
+    // same validation but ignore the validation error (pass 'false' to 
validate..)
+    failOnTopicValidation = false
+    val kcm1 = getKafkaCheckpointManagerWithParam((checkpointTopic8))
+    kcm1.register(taskName)
+    try {
+      kcm1.start
+    }catch {
+      case e: KafkaUtilException => fail("Did not expect a KafkaUtilException 
for invalid number of partitions in the topic.")
+    }
+    kcm1.stop
+  }
+
+  private def getKafkaCheckpointManagerWithParam(cpTopic: String) = new 
KafkaCheckpointManager(
     clientId = "some-client-id",
-    checkpointTopic = checkpointTopic,
+    checkpointTopic = cpTopic,
     systemName = "kafka",
     replicationFactor = 3,
     socketTimeout = 30000,
@@ -214,8 +244,12 @@ class TestKafkaCheckpointManager {
     connectProducer = () => new 
KafkaProducer(producerConfig.getProducerProperties),
     connectZk = () => new ZkClient(zkConnect, 60000, 60000, 
ZKStringSerializer),
     systemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString,
+    failOnCheckpointValidation = failOnTopicValidation,
     checkpointTopicProperties = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new 
MapConfig(Map[String, String]())))
 
+  // CheckpointManager with a specific checkpoint topic
+  private def getKafkaCheckpointManager = 
getKafkaCheckpointManagerWithParam(checkpointTopic)
+
   // inject serde. Kafka exceptions will be thrown when serde.fromBytes is 
called
   private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = 
new KafkaCheckpointManager(
     clientId = "some-client-id-invalid-serde",
@@ -229,6 +263,7 @@ class TestKafkaCheckpointManager {
     connectProducer = () => new 
KafkaProducer(producerConfig.getProducerProperties),
     connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
     systemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString,
+    failOnCheckpointValidation = failOnTopicValidation,
     serde = new InvalideSerde(exception),
     checkpointTopicProperties = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new 
MapConfig(Map[String, String]())))
 

Reply via email to