Chris Riccomini created SAMZA-289:
-------------------------------------

             Summary: Job fails with invalid checkpoint topic partition count
                 Key: SAMZA-289
                 URL: https://issues.apache.org/jira/browse/SAMZA-289
             Project: Samza
          Issue Type: Bug
          Components: kafka
    Affects Versions: 0.6.0
            Reporter: Chris Riccomini
            Assignee: Chris Riccomini
             Fix For: 0.7.0


We've been seeing failures off and on lately with messages like this:

{noformat}
org.apache.samza.checkpoint.kafka.KafkaCheckpointException: Checkpoint topic 
validation failed for topic __samza_checkpoint_samza-perf-store-all-calls_i001 
because partition count 8 did not match expected partition count 64.
{noformat}

This causes the entire job to fail. It is triggered the first time a Samza job 
is run in a cluster, and only on jobs with an input stream that has more than 
the default partition count (num.partitions in the Kafka broker).

I believe there is a race condition in KafkaCheckpointManager. Right now we 
only run KafkaCheckpointManager.createTopic in the container that owns the 
first partition:

{code}
  def start {
    if (partitions.contains(new Partition(0))) {
      createTopic
    }

    validateTopic
  }
{code}

If a container starts before the container with partition 0, then the container 
without partition 0 will just run validateTopic. This triggers a call to get 
TopicMetadata from Kafka. If the checkpoint topic doesn't exist, I believe the 
broker will say so, but it will also create the topic in the background. When 
it does this, num.partitions (the default partition count) will be used to 
define how many partitions the new checkpoint topic has.

If a Samza job's task.input stream list contains a stream with a non-default 
number of partitions (e.g. num.partitions=8, but task.inputs has a stream with 
16 partitions), then this race condition can trigger a checkpoint topic with 8 
partitions, and the validation will fail.

I think the simplest fix is just to strip the if statement from 
KafkaCheckpointManager.start, and have all containers try and create the 
checkpoint topic. This will eliminate the race condition, since all containers 
will try and create the checkpoint topic with the correct number of partitions.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to