Repository: samza Updated Branches: refs/heads/master 1e97fb2e4 -> 9e8107ea7
SAMZA-864: allow job to continue when checkpoint partition validation fails Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9e8107ea Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9e8107ea Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9e8107ea Branch: refs/heads/master Commit: 9e8107ea7b710b4572fe18c5fd37bc0d0be47ad1 Parents: 1e97fb2 Author: Boris Shkolnik <bor...@apache.org> Authored: Thu Feb 18 15:42:26 2016 -0800 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Fri Feb 19 10:06:31 2016 -0800 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 7 +++ .../org/apache/samza/config/JobConfig.scala | 8 ++++ .../kafka/KafkaCheckpointManager.scala | 3 +- .../kafka/KafkaCheckpointManagerFactory.scala | 1 + .../migration/KafkaCheckpointMigration.scala | 3 +- .../scala/org/apache/samza/util/KafkaUtil.scala | 16 +++++-- .../kafka/TestKafkaCheckpointManager.scala | 47 +++++++++++++++++--- 7 files changed, 73 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 6705530..175437c 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -231,6 +231,13 @@ </dl> </td> </tr> + <tr> + <td class="property" id="job-checkpoint-validation-enabled">job.checkpoint.<br>validation.enabled</td> + <td class="default">true</td> + <td class="description"> + This setting controls if the job should fail(true) or just warn(false) in case the validation of checkpoint partition number fails. <br/> <b>CAUTION</b>: this configuration needs to be used w/ care. It should only be used as a work-around after the checkpoint has been auto-created with wrong number of partitions by mistake. + </td> + </tr> <tr> <th colspan="3" class="section" id="task"><a href="../api/overview.html">Task configuration</a></th> http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 1a8adae..4f3e9a2 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -43,6 +43,12 @@ object JobConfig { val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor" val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes" val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory" + // number of partitions in the checkpoint stream should be 1. But sometimes, + // if a stream was created(automatically) with the wrong number of partitions(default number of partitions + // for new streams), there is no easy fix for the user (topic deletion or reducing of number of partitions + // is not yet supported, and auto-creation of the topics cannot be always easily tuned off). + // So we add a setting that allows for the job to continue even though number of partitions is not 1. + val JOB_FAIL_CHECKPOINT_VALIDATION = "job.checkpoint.validation.enabled" implicit def Config2Job(config: Config) = new JobConfig(config) } @@ -72,6 +78,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getJobId = getOption(JobConfig.JOB_ID) + def failOnCheckpointValidation = { getBoolean(JobConfig.JOB_FAIL_CHECKPOINT_VALIDATION, true) } + def getConfigRewriters = getOption(JobConfig.CONFIG_REWRITERS) def getConfigRewriterClass(name: String) = getOption(JobConfig.CONFIG_REWRITER_CLASS format name) http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 787de1f..ea10cae 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -58,6 +58,7 @@ class KafkaCheckpointManager( connectProducer: () => Producer[Array[Byte], Array[Byte]], val connectZk: () => ZkClient, systemStreamPartitionGrouperFactoryString: String, + failOnCheckpointValidation: Boolean, val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, serde: CheckpointSerde = new CheckpointSerde, checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging { @@ -275,7 +276,7 @@ class KafkaCheckpointManager( def start { kafkaUtil.createTopic(checkpointTopic, 1, replicationFactor, checkpointTopicProperties) - kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, metadataStore, 1) + kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, metadataStore, 1, failOnCheckpointValidation) } def register(taskName: TaskName) { http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index 7db8940..4e97376 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -96,6 +96,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin connectProducer, connectZk, config.getSystemStreamPartitionGrouperFactory, // To find out the SSPGrouperFactory class so it can be included/verified in the key + config.failOnCheckpointValidation, checkpointTopicProperties = getCheckpointTopicProperties(config)) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala index c6b1fe4..5e8cc65 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala @@ -104,7 +104,8 @@ class KafkaCheckpointMigration extends MigrationPlan with Logging { checkpointTopicName, getCheckpointSystemName(config), getTopicMetadataStore(config), - 1) + 1, + config.failOnCheckpointValidation) if (migrationVerification(coordinatorSystemConsumer)) { info("Migration %s was already performed, doing nothing" format migrationKey) http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index f4311d1..a25ba62 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -137,11 +137,13 @@ class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSlee * @param systemName Kafka system to use * @param metadataStore Topic Metadata store * @param expectedPartitionCount Expected number of partitions + * @param failOnValidation If true - fail the job if the validation fails */ def validateTopicPartitionCount(topicName: String, systemName: String, metadataStore: TopicMetadataStore, - expectedPartitionCount: Int) { + expectedPartitionCount: Int, + failOnValidation: Boolean = true) { info("Validating topic %s. Expecting partition count: %d" format (topicName, expectedPartitionCount)) retryBackoff.run( loop => { @@ -150,9 +152,15 @@ class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSlee KafkaUtil.maybeThrowException(topicMetadata.errorCode) val partitionCount = topicMetadata.partitionsMetadata.length - if (partitionCount != expectedPartitionCount) { - throw new KafkaUtilException("Validation failed for topic %s because partition count %s did not " + - "match expected partition count of %d." format(topicName, partitionCount, expectedPartitionCount)) + if (partitionCount != expectedPartitionCount) + { + val msg = "Validation failed for topic %s because partition count %s did not " + + "match expected partition count of %d." format(topicName, partitionCount, expectedPartitionCount) + if (failOnValidation) { + throw new KafkaUtilException(msg) + } else { + warn(msg + " Ignoring the failure.") + } } info("Successfully validated topic %s." format topicName) http://git-wip-us.apache.org/repos/asf/samza/blob/9e8107ea/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index af4051b..e6815da 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -28,7 +28,7 @@ import kafka.zk.EmbeddedZookeeper import org.I0Itec.zkclient.ZkClient import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord} import org.apache.samza.checkpoint.Checkpoint -import org.apache.samza.config.{KafkaProducerConfig, MapConfig} +import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig} import org.apache.samza.container.TaskName import org.apache.samza.container.grouper.stream.GroupByPartitionFactory import org.apache.samza.serializers.CheckpointSerde @@ -81,6 +81,7 @@ class TestKafkaCheckpointManager { var server2: KafkaServer = null var server3: KafkaServer = null var metadataStore: TopicMetadataStore = null + var failOnTopicValidation = true val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName @@ -125,13 +126,13 @@ class TestKafkaCheckpointManager { } - private def createCheckpointTopic(cpTopic: String = checkpointTopic) = { + private def createCheckpointTopic(cpTopic: String = checkpointTopic, partNum: Int = 1) = { val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer) try { AdminUtils.createTopic( zkClient, - checkpointTopic, - 1, + cpTopic, + partNum, 1, checkpointTopicConfig) } catch { @@ -202,9 +203,38 @@ class TestKafkaCheckpointManager { } } - private def getKafkaCheckpointManager = new KafkaCheckpointManager( + @Test + def testFailOnTopicValidation { + // first case - default case, we should fail on validation + failOnTopicValidation = true + val checkpointTopic8 = checkpointTopic + "8"; + val kcm = getKafkaCheckpointManagerWithParam(checkpointTopic8) + val taskName = new TaskName(partition.toString) + kcm.register(taskName) + createCheckpointTopic(checkpointTopic8, 8) // create topic with the wrong number of partitions + try { + kcm.start + fail("Expected a KafkaUtilException for invalid number of partitions in the topic.") + }catch { + case e: KafkaUtilException => None + } + kcm.stop + + // same validation but ignore the validation error (pass 'false' to validate..) + failOnTopicValidation = false + val kcm1 = getKafkaCheckpointManagerWithParam((checkpointTopic8)) + kcm1.register(taskName) + try { + kcm1.start + }catch { + case e: KafkaUtilException => fail("Did not expect a KafkaUtilException for invalid number of partitions in the topic.") + } + kcm1.stop + } + + private def getKafkaCheckpointManagerWithParam(cpTopic: String) = new KafkaCheckpointManager( clientId = "some-client-id", - checkpointTopic = checkpointTopic, + checkpointTopic = cpTopic, systemName = "kafka", replicationFactor = 3, socketTimeout = 30000, @@ -214,8 +244,12 @@ class TestKafkaCheckpointManager { connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties), connectZk = () => new ZkClient(zkConnect, 60000, 60000, ZKStringSerializer), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, + failOnCheckpointValidation = failOnTopicValidation, checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]()))) + // CheckpointManager with a specific checkpoint topic + private def getKafkaCheckpointManager = getKafkaCheckpointManagerWithParam(checkpointTopic) + // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager( clientId = "some-client-id-invalid-serde", @@ -229,6 +263,7 @@ class TestKafkaCheckpointManager { connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties), connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, + failOnCheckpointValidation = failOnTopicValidation, serde = new InvalideSerde(exception), checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))