Hi Chen Song,

There are two different concepts: *checkpoint* and *changelog*. Checkpoint
is for the offset of the messages, while the changelog is for the kv-store.
The code snippet you show is for the checkpoint , not for the changelog.

{quote}
1. When implementing our Samza task, does each call of process method
triggers a call to TaskInstance.commit?
{quote}

TaskInstance.commit triggers the *checkpoint* . It is triggered every
task.commit.ms , (default is 60000ms). The code is here
<https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166>
. Basically, the RunLoop class calls the commit method, but only trigger
the commit behavior every configured time.

If you are talking about the *changelog*, it's not controlled by the commit
method. Instead, every put/delete calls the "send
<https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51>"
of the system Producer. (code is here
<https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66>).
In terms of how often the "send" really *send *to the broker (e.g. kafka),
it depends on your producer's configuration. For example, in Kafka, you can
have the producer send a batch (setting async), or send one msg a time
(setting sync). What it means is that, it leaves the System to decide how
to deal with the "send" method.


{quote}
2. Is there a way to buffer these commit activities in memory and flush
periodically? Our job is joining >1mm messages per second using a KV store
and we have a lot of concern for the changelog size, as in the worst case,
the change log will grow as fast as the input log.
{quote}

If you are talking about the checkpoint, you can change the task.commit.ms .

If you are thinking of the changelog (kv-store), you can change the
producer's config to batch a few changes and send to the broker.

I think the guys in the community with more operational experience are able
to tell you what is the best practice.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <chen.song...@gmail.com> wrote:

> We are trying to understand the order of commits when processing each
> message in a Samza job.
>
> T1: input offset commit
> T2: changelog commit
> T3: output commit
>
> By looking at the code snippet in
>
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> ,
> my understanding is that for each input message, Samza always send update
> message on changelog, send the output message and then commit the input
> offset. It makes sense to me at the high level in terms of at least once
> processing.
>
> Specifically, we have two dumb questions:
>
> 1. When implementing our Samza task, does each call of process method
> triggers a call to TaskInstance.commit?
> 2. Is there a way to buffer these commit activities in memory and flush
> periodically? Our job is joining >1mm messages per second using a KV store
> and we have a lot of concern for the changelog size, as in the worst case,
> the change log will grow as fast as the input log.
>
> Chen
>
> --
> Chen Song
>

Reply via email to