[ 
https://issues.apache.org/jira/browse/SAMZA-289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14030803#comment-14030803
 ] 

Jakob Homan commented on SAMZA-289:
-----------------------------------

+1

> Job fails with invalid checkpoint topic partition count
> -------------------------------------------------------
>
>                 Key: SAMZA-289
>                 URL: https://issues.apache.org/jira/browse/SAMZA-289
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>            Priority: Blocker
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-289.0.patch
>
>
> We've been seeing failures off and on lately with messages like this:
> {noformat}
> org.apache.samza.checkpoint.kafka.KafkaCheckpointException: Checkpoint topic 
> validation failed for topic 
> __samza_checkpoint_samza-perf-store-all-calls_i001 because partition count 8 
> did not match expected partition count 64.
> {noformat}
> This causes the entire job to fail. It is triggered the first time a Samza 
> job is run in a cluster, and only on jobs with an input stream that has more 
> than the default partition count (num.partitions in the Kafka broker).
> I believe there is a race condition in KafkaCheckpointManager. Right now we 
> only run KafkaCheckpointManager.createTopic in the container that owns the 
> first partition:
> {code}
>   def start {
>     if (partitions.contains(new Partition(0))) {
>       createTopic
>     }
>     validateTopic
>   }
> {code}
> If a container starts before the container with partition 0, then the 
> container without partition 0 will just run validateTopic. This triggers a 
> call to get TopicMetadata from Kafka. If the checkpoint topic doesn't exist, 
> I believe the broker will say so, but it will also create the topic in the 
> background. When it does this, num.partitions (the default partition count) 
> will be used to define how many partitions the new checkpoint topic has.
> If a Samza job's task.input stream list contains a stream with a non-default 
> number of partitions (e.g. num.partitions=8, but task.inputs has a stream 
> with 16 partitions), then this race condition can trigger a checkpoint topic 
> with 8 partitions, and the validation will fail.
> I think the simplest fix is just to strip the if statement from 
> KafkaCheckpointManager.start, and have all containers try and create the 
> checkpoint topic. This will eliminate the race condition, since all 
> containers will try and create the checkpoint topic with the correct number 
> of partitions.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to