[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2020-05-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106558#comment-17106558
 ] 

Matthias J. Sax commented on KAFKA-3184:


I am personally not sure how useful "local checkpointing" is at all? Note that 
for persistent stores, the ides is to allow holding state that is larger than 
main memory. It's not really related to fault-tolerance or similar (it only has 
the nice side effect that rolling restarts are quick – for this case, maybe 
dumping the in-memory store on disk during shutdown might be helpful; but this 
would not be regular checkpointing). Also, to avoid too long state recovery 
times, used can configure standbys.

For scale-out, the same issue arrises for in-memory and persistent stores and 
it's addressed via KIP-441. So we should not conflate orthogonal issue.

Having a remote checkpoint mechanism would be a nice to have feature, but it 
raises a lot of complex issues we need to address. (1) where to actually put 
the store? Kafka Streams is a library and should not have other dependencies by 
default; thus, remote checkpointing must be an opt-in feature only. (2) How can 
we do incremental checkpointing (if it's not incremental, it's too heavy weight 
and we don't need to build it at all), (3) how do we "compact" the check point 
increments in the remote location? This is the hardest issue we need to solve.

There was also the idea to actually re-use standbys for quick recovery: instead 
of checkpointing to remote storage, one just configures standbys. On recovery, 
instead of reading the changelog, we copy RocksDB sst files directly from the 
standby to the active (or to be more precise, the standby would be come the 
active anyway...) This approach avoids the dependency to an external system and 
also solves the "how to compact" issue, as RocksDB does it for us. Of course, 
it's more expensive (in dollars) to keep the state in the app compared to 
pushing it to cheap external storage.

Thus some food for thoughts.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2020-05-13 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106046#comment-17106046
 ] 

Nikolay Izhikov commented on KAFKA-3184:


[~guozhang] Thanks for the feedback and detailed explanation. 
I'm very interested and will create a separate ticket for Checkpointing API.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2020-05-12 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17105816#comment-17105816
 ] 

Guozhang Wang commented on KAFKA-3184:
--

Hello [~nizhikov] for many state-light applications, it is not worthy having 
persistent stores; but with in-memory stores since we do not have any 
persistent checkpoints, upon rolling upgrade or scaling events we always have 
to re-bootstrap the whole state from beginning and that's blocking the 
usefulness of in-memory stores. So when I created this ticket about 4 years 
ago, my main motivation is to make in-memory stores more attractive to be used 
for certain scenarios where your state is relatively small. Now with a lot of 
rebalance improvements we've done including KIP-441, I think just allow 
checkpointing for in-memory state stores locally may not be more interesting.

Instead, I think what [~vvcephei] was considering is, to provide a general 
checkpointing API for state stores in Streams (not only for in-memory but also 
for persistent stores), where the checkpoint location can be either local disks 
or remote storage, and here the design scope is primarily on 1) the API design 
for both checkpointing as well as loading checkpoints into the local state 
stores, 2) the mechanism of the checkpointing, e.g. whether it should be async? 
whether it should be executed on separate threads? etc. I think this is as of 
today a more appealing feature to add, and if you are interested, we should 
just create a new JIRA for it other than piggy-backing on 3184.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2020-05-04 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099287#comment-17099287
 ] 

Nikolay Izhikov commented on KAFKA-3184:


[~guozhang]

> we need to make the checkpointed offsets value to be consistent with the 
> checkpoint image on disk itself

Can you, please, clarify this requirement.
Right now, I call {{FileChannel.force}} on each checkpoint.
So all data are forsed to the disk.
Is it enough? 


> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2020-05-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17097581#comment-17097581
 ] 

Matthias J. Sax commented on KAFKA-3184:


Works for me. But we can also discuss on the PR?

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2020-05-01 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17097430#comment-17097430
 ] 

Nikolay Izhikov commented on KAFKA-3184:


[~mjsax]

Can we use ".store" extension and "store file" naming?

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2020-04-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096965#comment-17096965
 ] 

Matthias J. Sax commented on KAFKA-3184:


[~guozhang] – from a naming perspective, we do have "checkpoint files" already 
that only store some offset metadata. It might be good to resolve this naming 
clash to avoid confusion in future discussion. Not sure atm, how to resolve it 
though. Thoughts?

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2020-04-30 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096718#comment-17096718
 ] 

Nikolay Izhikov commented on KAFKA-3184:


Hello, [~guozhang]

I've prepared PR with the simplest implementation of an in-memory state store 
checkpointer.

If persistent mode is enabled then:
  * checkpoint thread started on KeyValueStore#init
  * every InMemoryKeyValueStore#COUNT_FLUSH_TO_STORE flush execution copy of 
the InMemoryKeyValueStore#map passed to checkpoint thread.
  * checkpoint thread persists data every time it sees a new instance of 
InMemoryKeyValueStore#map.
  * persisted data are loaded on KeyValueStore#init.

Can you, please, take a look.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2019-01-07 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736113#comment-16736113
 ] 

Guozhang Wang commented on KAFKA-3184:
--

[~Yohan123] I think you are right: currently `persistent()` is used to 
determine whether we should record checkpoint offsets, since for non-persistent 
stores there are no data flushed to persistent storage and therefore we can 
only restore from beginning every time upon resuming. With this JIRA in-memory 
stores would be "persisted" to storage as well, the only difference is that it 
is not written until that "snapshotting" point is reached (e.g. after N 
commits). And hence in this case persistent() flag is not that useful anyways 
and may consider deprecated.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2018-12-15 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722380#comment-16722380
 ] 

Richard Yu commented on KAFKA-3184:
---

As Ted pointed out, persistent() in {{InMemoryKeyValueStore}} would have to 
return true with this patch. So then would other stores which formerly returned 
false for persistent() also be committed? 

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2018-11-27 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701422#comment-16701422
 ] 

Matthias J. Sax commented on KAFKA-3184:


Just a side comment: RocksDB allows to "pin/freeze" a current state to protect 
it from compaction. This is a light weight metadata operation. After a version 
is pinned (not sure atm what the correct RocksDB term for this operation is), 
the immutable SST files can be copied in the background to complete a 
checkpoint.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2018-01-29 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344081#comment-16344081
 ] 

Ted Yu commented on KAFKA-3184:
---

bq. only do the checkpointing every N. flush calls

This seems to be better than checkpointing for each commit.
However, the checkpoint may take non-trivial time to complete.
So async checkpointing is preferred.
If the previous checkpointing is not finished and N flush calls have passed, do 
we just delay the next checkpoint ?

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2018-01-18 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331348#comment-16331348
 ] 

Guozhang Wang commented on KAFKA-3184:
--

[~yuzhih...@gmail.com] sorry for the late reply!

Your understanding is basically right, and here are my thoughts about the 
flushing:

1. It should not be expensive and stopping-the-world, since flushing calls may 
be called on each commit. I was thinking that it could either by async (but we 
need to make the checkpointed offsets value to be consistent with the 
checkpoint image on disk itself); or we only do the checkpointing every N. 
flush calls.

2. As for {{persistent()}}, currently it is only used in 
{{ProcessorStateManager#checkpoint}} and 
{{StoreChangelogReader#restoredOffsets}}; with in-memory state stores being 
checkpointed periodically, I think we can just deprecate this flag and let 
these two callers always checkpoint / save restored offsets.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)