Hi, Is your checkpoint topic log compacted? That may help in reducing the size of the log.
On Sat, May 7, 2016 at 2:35 AM, Liu Bo <diabl...@gmail.com> wrote: > Hi group > > I've got this problem while doing maintenance of our samza job, that job > has been running for a while and processed nearly 350 million data, > > we have 40 partitions with 10s checkpoint commit interval, this leads to a > checkpoint topic with 18308702 messages. 10s interval is probably too > short, I just increased it. > > Job starting hangs for 20 mins to get the Checkpointed offset, and I think > each of the job are trying to read all the checkpoint message, and kafka > took all 100MB server bandwidth at that time. Here's the log for one of > the machine: > > 2016-05-07 16:21:29 KafkaCheckpointManager [INFO] Reading checkpoint > for taskName Partition 18 > > 2016-05-07 16:21:29 KafkaCheckpointManager [INFO] No TaskName to > checkpoint mapping provided. Reading for first time. > > 2016-05-07 16:21:29 KafkaCheckpointManager [INFO] Connecting to leader > 10.111.0.132:9093 for topic __samza_checkpoint_ver_1_for_xxx-remote_1 > and to *fetch all checkpoint messages*. > > 2016-05-07 16:21:29 KafkaCheckpointManager [INFO] Got offset 0 for > topic __samza_checkpoint_ver_1_for_xxx-remote_1 and partition 0. > Attempting to fetch messages for checkpoint log. > > 2016-05-07 *16:21:29* KafkaCheckpointManager [INFO] Get latest offset > 18308702 for topic __samza_checkpoint_ver_1_for_xxx-remote_1 and > partition 0. > > 2016-05-07 *16:43:42* KafkaCheckpointManager [INFO] Got checkpoint > state for taskName Partition 18: Checkpoint > [offsets={SystemStreamPartition [message, xxx_nlp, 18]=9350574}] > > 2016-05-07 16:43:42 OffsetManager [INFO] Checkpointed offset is > currently 9350574 for SystemStreamPartition [message, xxx_nlp, 18] > > > The log tells that I missed taskname to checkpoint mapping, would this be > the cause? here's my checkpoint config: > > job.name=xxx_remote > > > systems.message.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > task.checkpoint.system=message > task.commit.ms=10000 > task.checkpoint.replication.factor=2 > > > My restarting step is like : > > 1. update the samza package at hdfs > 2. yarn kill the job > 3. start the job using samza run-job scripts > > > I have done this for several times, and only this time cause this problem > as far as I cam remember. Is there anything wrong with the step or the > config? I'm suspecting that I tried to start the job before previous one > could shutdown safely. > > Your help will be much of my appreciation. > > BTW: I think the checkpoint manager could start from the end of the > checkpoint, and try to "look backward" for the offset for current partition > instead of reading all of them. > > -- > All the best > > Liu Bo > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University