Chris Riccomini created SAMZA-289:
-------------------------------------
Summary: Job fails with invalid checkpoint topic partition count
Key: SAMZA-289
URL: https://issues.apache.org/jira/browse/SAMZA-289
Project: Samza
Issue Type: Bug
Components: kafka
Affects Versions: 0.6.0
Reporter: Chris Riccomini
Assignee: Chris Riccomini
Fix For: 0.7.0
We've been seeing failures off and on lately with messages like this:
{noformat}
org.apache.samza.checkpoint.kafka.KafkaCheckpointException: Checkpoint topic
validation failed for topic __samza_checkpoint_samza-perf-store-all-calls_i001
because partition count 8 did not match expected partition count 64.
{noformat}
This causes the entire job to fail. It is triggered the first time a Samza job
is run in a cluster, and only on jobs with an input stream that has more than
the default partition count (num.partitions in the Kafka broker).
I believe there is a race condition in KafkaCheckpointManager. Right now we
only run KafkaCheckpointManager.createTopic in the container that owns the
first partition:
{code}
def start {
if (partitions.contains(new Partition(0))) {
createTopic
}
validateTopic
}
{code}
If a container starts before the container with partition 0, then the container
without partition 0 will just run validateTopic. This triggers a call to get
TopicMetadata from Kafka. If the checkpoint topic doesn't exist, I believe the
broker will say so, but it will also create the topic in the background. When
it does this, num.partitions (the default partition count) will be used to
define how many partitions the new checkpoint topic has.
If a Samza job's task.input stream list contains a stream with a non-default
number of partitions (e.g. num.partitions=8, but task.inputs has a stream with
16 partitions), then this race condition can trigger a checkpoint topic with 8
partitions, and the validation will fail.
I think the simplest fix is just to strip the if statement from
KafkaCheckpointManager.start, and have all containers try and create the
checkpoint topic. This will eliminate the race condition, since all containers
will try and create the checkpoint topic with the correct number of partitions.
--
This message was sent by Atlassian JIRA
(v6.2#6252)