Hi,

Is your checkpoint topic log compacted? That may help in reducing the size
of the log.

On Sat, May 7, 2016 at 2:35 AM, Liu Bo <diabl...@gmail.com> wrote:

> Hi group
>
> I've got this problem while doing maintenance of our samza job, that job
> has been running for a while and processed nearly 350 million data,
>
> we have 40 partitions with 10s checkpoint commit interval, this leads to a
> checkpoint topic with 18308702 messages. 10s interval is probably too
> short, I just increased it.
>
> Job starting hangs for 20 mins to get the Checkpointed offset, and I think
> each of the job are trying to read all the checkpoint message, and kafka
> took all 100MB server bandwidth at that time.  Here's the log for one of
> the machine:
>
> 2016-05-07 16:21:29 KafkaCheckpointManager [INFO] Reading checkpoint
> for taskName Partition 18
>
> 2016-05-07 16:21:29 KafkaCheckpointManager [INFO] No TaskName to
> checkpoint mapping provided.  Reading for first time.
>
> 2016-05-07 16:21:29 KafkaCheckpointManager [INFO] Connecting to leader
> 10.111.0.132:9093 for topic __samza_checkpoint_ver_1_for_xxx-remote_1
> and to *fetch all checkpoint messages*.
>
> 2016-05-07 16:21:29 KafkaCheckpointManager [INFO] Got offset 0 for
> topic __samza_checkpoint_ver_1_for_xxx-remote_1 and partition 0.
> Attempting to fetch messages for checkpoint log.
>
> 2016-05-07 *16:21:29* KafkaCheckpointManager [INFO] Get latest offset
> 18308702 for topic __samza_checkpoint_ver_1_for_xxx-remote_1 and
> partition 0.
>
> 2016-05-07 *16:43:42* KafkaCheckpointManager [INFO] Got checkpoint
> state for taskName Partition 18: Checkpoint
> [offsets={SystemStreamPartition [message, xxx_nlp, 18]=9350574}]
>
> 2016-05-07 16:43:42 OffsetManager [INFO] Checkpointed offset is
> currently 9350574 for SystemStreamPartition [message, xxx_nlp, 18]
>
>
> The log tells that I missed taskname to checkpoint mapping, would this be
> the cause?  here's my checkpoint config:
>
> job.name=xxx_remote
>
>
> systems.message.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>
>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> task.checkpoint.system=message
> task.commit.ms=10000
> task.checkpoint.replication.factor=2
>
>
> My restarting step is like :
>
> 1. update the samza package at hdfs
> 2. yarn kill the job
> 3. start the job using samza run-job scripts
>
>
> I have done this for several times, and only this time cause this problem
> as far as I cam remember. Is there anything wrong with the step or the
> config? I'm suspecting that I tried to start the job before previous one
> could shutdown safely.
>
> Your help will be much of my appreciation.
>
> BTW: I think the checkpoint manager could start from the end of the
> checkpoint, and try to "look backward" for the offset for current partition
> instead of reading all of them.
>
> --
> All the best
>
> Liu Bo
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to