[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106558#comment-17106558 ] Matthias J. Sax commented on KAFKA-3184: I am personally not sure how useful "local checkpointing" is at all? Note that for persistent stores, the ides is to allow holding state that is larger than main memory. It's not really related to fault-tolerance or similar (it only has the nice side effect that rolling restarts are quick – for this case, maybe dumping the in-memory store on disk during shutdown might be helpful; but this would not be regular checkpointing). Also, to avoid too long state recovery times, used can configure standbys. For scale-out, the same issue arrises for in-memory and persistent stores and it's addressed via KIP-441. So we should not conflate orthogonal issue. Having a remote checkpoint mechanism would be a nice to have feature, but it raises a lot of complex issues we need to address. (1) where to actually put the store? Kafka Streams is a library and should not have other dependencies by default; thus, remote checkpointing must be an opt-in feature only. (2) How can we do incremental checkpointing (if it's not incremental, it's too heavy weight and we don't need to build it at all), (3) how do we "compact" the check point increments in the remote location? This is the hardest issue we need to solve. There was also the idea to actually re-use standbys for quick recovery: instead of checkpointing to remote storage, one just configures standbys. On recovery, instead of reading the changelog, we copy RocksDB sst files directly from the standby to the active (or to be more precise, the standby would be come the active anyway...) This approach avoids the dependency to an external system and also solves the "how to compact" issue, as RocksDB does it for us. Of course, it's more expensive (in dollars) to keep the state in the app compared to pushing it to cheap external storage. Thus some food for thoughts. > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Nikolay Izhikov >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106046#comment-17106046 ] Nikolay Izhikov commented on KAFKA-3184: [~guozhang] Thanks for the feedback and detailed explanation. I'm very interested and will create a separate ticket for Checkpointing API. > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Nikolay Izhikov >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105816#comment-17105816 ] Guozhang Wang commented on KAFKA-3184: -- Hello [~nizhikov] for many state-light applications, it is not worthy having persistent stores; but with in-memory stores since we do not have any persistent checkpoints, upon rolling upgrade or scaling events we always have to re-bootstrap the whole state from beginning and that's blocking the usefulness of in-memory stores. So when I created this ticket about 4 years ago, my main motivation is to make in-memory stores more attractive to be used for certain scenarios where your state is relatively small. Now with a lot of rebalance improvements we've done including KIP-441, I think just allow checkpointing for in-memory state stores locally may not be more interesting. Instead, I think what [~vvcephei] was considering is, to provide a general checkpointing API for state stores in Streams (not only for in-memory but also for persistent stores), where the checkpoint location can be either local disks or remote storage, and here the design scope is primarily on 1) the API design for both checkpointing as well as loading checkpoints into the local state stores, 2) the mechanism of the checkpointing, e.g. whether it should be async? whether it should be executed on separate threads? etc. I think this is as of today a more appealing feature to add, and if you are interested, we should just create a new JIRA for it other than piggy-backing on 3184. > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Nikolay Izhikov >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099287#comment-17099287 ] Nikolay Izhikov commented on KAFKA-3184: [~guozhang] > we need to make the checkpointed offsets value to be consistent with the > checkpoint image on disk itself Can you, please, clarify this requirement. Right now, I call {{FileChannel.force}} on each checkpoint. So all data are forsed to the disk. Is it enough? > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Nikolay Izhikov >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097581#comment-17097581 ] Matthias J. Sax commented on KAFKA-3184: Works for me. But we can also discuss on the PR? > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Nikolay Izhikov >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097430#comment-17097430 ] Nikolay Izhikov commented on KAFKA-3184: [~mjsax] Can we use ".store" extension and "store file" naming? > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Nikolay Izhikov >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096965#comment-17096965 ] Matthias J. Sax commented on KAFKA-3184: [~guozhang] – from a naming perspective, we do have "checkpoint files" already that only store some offset metadata. It might be good to resolve this naming clash to avoid confusion in future discussion. Not sure atm, how to resolve it though. Thoughts? > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Nikolay Izhikov >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096718#comment-17096718 ] Nikolay Izhikov commented on KAFKA-3184: Hello, [~guozhang] I've prepared PR with the simplest implementation of an in-memory state store checkpointer. If persistent mode is enabled then: * checkpoint thread started on KeyValueStore#init * every InMemoryKeyValueStore#COUNT_FLUSH_TO_STORE flush execution copy of the InMemoryKeyValueStore#map passed to checkpoint thread. * checkpoint thread persists data every time it sees a new instance of InMemoryKeyValueStore#map. * persisted data are loaded on KeyValueStore#init. Can you, please, take a look. > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Nikolay Izhikov >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16736113#comment-16736113 ] Guozhang Wang commented on KAFKA-3184: -- [~Yohan123] I think you are right: currently `persistent()` is used to determine whether we should record checkpoint offsets, since for non-persistent stores there are no data flushed to persistent storage and therefore we can only restore from beginning every time upon resuming. With this JIRA in-memory stores would be "persisted" to storage as well, the only difference is that it is not written until that "snapshotting" point is reached (e.g. after N commits). And hence in this case persistent() flag is not that useful anyways and may consider deprecated. > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722380#comment-16722380 ] Richard Yu commented on KAFKA-3184: --- As Ted pointed out, persistent() in {{InMemoryKeyValueStore}} would have to return true with this patch. So then would other stores which formerly returned false for persistent() also be committed? > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701422#comment-16701422 ] Matthias J. Sax commented on KAFKA-3184: Just a side comment: RocksDB allows to "pin/freeze" a current state to protect it from compaction. This is a light weight metadata operation. After a version is pinned (not sure atm what the correct RocksDB term for this operation is), the immutable SST files can be copied in the background to complete a checkpoint. > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344081#comment-16344081 ] Ted Yu commented on KAFKA-3184: --- bq. only do the checkpointing every N. flush calls This seems to be better than checkpointing for each commit. However, the checkpoint may take non-trivial time to complete. So async checkpointing is preferred. If the previous checkpointing is not finished and N flush calls have passed, do we just delay the next checkpoint ? > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16331348#comment-16331348 ] Guozhang Wang commented on KAFKA-3184: -- [~yuzhih...@gmail.com] sorry for the late reply! Your understanding is basically right, and here are my thoughts about the flushing: 1. It should not be expensive and stopping-the-world, since flushing calls may be called on each commit. I was thinking that it could either by async (but we need to make the checkpointed offsets value to be consistent with the checkpoint image on disk itself); or we only do the checkpointing every N. flush calls. 2. As for {{persistent()}}, currently it is only used in {{ProcessorStateManager#checkpoint}} and {{StoreChangelogReader#restoredOffsets}}; with in-memory state stores being checkpointed periodically, I think we can just deprecate this flag and let these two callers always checkpoint / save restored offsets. > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300417#comment-16300417 ] Ted Yu commented on KAFKA-3184: --- >From InMemoryKeyValueStore : {code} public void flush() { // do-nothing since it is in-memory {code} For this task, I assume in-memory state store needs to implement flush() (and return true in {{ persistent() }}). If so, we need to decide the format which flush uses. > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian JIRA (v6.4.14#64029)