Hi group

I've got this problem while doing maintenance of our samza job, that job
has been running for a while and processed nearly 350 million data,

we have 40 partitions with 10s checkpoint commit interval, this leads to a
checkpoint topic with 18308702 messages. 10s interval is probably too
short, I just increased it.

Job starting hangs for 20 mins to get the Checkpointed offset, and I think
each of the job are trying to read all the checkpoint message, and kafka
took all 100MB server bandwidth at that time.  Here's the log for one of
the machine:

2016-05-07 16:21:29 KafkaCheckpointManager [INFO] Reading checkpoint
for taskName Partition 18

2016-05-07 16:21:29 KafkaCheckpointManager [INFO] No TaskName to
checkpoint mapping provided.  Reading for first time.

2016-05-07 16:21:29 KafkaCheckpointManager [INFO] Connecting to leader
10.111.0.132:9093 for topic __samza_checkpoint_ver_1_for_xxx-remote_1
and to *fetch all checkpoint messages*.

2016-05-07 16:21:29 KafkaCheckpointManager [INFO] Got offset 0 for
topic __samza_checkpoint_ver_1_for_xxx-remote_1 and partition 0.
Attempting to fetch messages for checkpoint log.

2016-05-07 *16:21:29* KafkaCheckpointManager [INFO] Get latest offset
18308702 for topic __samza_checkpoint_ver_1_for_xxx-remote_1 and
partition 0.

2016-05-07 *16:43:42* KafkaCheckpointManager [INFO] Got checkpoint
state for taskName Partition 18: Checkpoint
[offsets={SystemStreamPartition [message, xxx_nlp, 18]=9350574}]

2016-05-07 16:43:42 OffsetManager [INFO] Checkpointed offset is
currently 9350574 for SystemStreamPartition [message, xxx_nlp, 18]


The log tells that I missed taskname to checkpoint mapping, would this be
the cause?  here's my checkpoint config:

job.name=xxx_remote

systems.message.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=message
task.commit.ms=10000
task.checkpoint.replication.factor=2


My restarting step is like :

1. update the samza package at hdfs
2. yarn kill the job
3. start the job using samza run-job scripts


I have done this for several times, and only this time cause this problem
as far as I cam remember. Is there anything wrong with the step or the
config? I'm suspecting that I tried to start the job before previous one
could shutdown safely.

Your help will be much of my appreciation.

BTW: I think the checkpoint manager could start from the end of the
checkpoint, and try to "look backward" for the offset for current partition
instead of reading all of them.

-- 
All the best

Liu Bo

Reply via email to