mjsax commented on code in PR #18752:
URL: https://github.com/apache/kafka/pull/18752#discussion_r1976509566
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java:
##########
@@ -252,6 +252,10 @@ public void shouldCheckpointOffsetsWhenStateIsFlushed() {
@Test
public void shouldNotCheckpointIfNotReceivedEnoughRecords() {
globalStateTask.initialize();
+ // Reset after initialization since checkpointing should happen during
initialization, KAFKA-18168
Review Comment:
Or maybe even remove the commend entirely?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java:
##########
@@ -96,6 +96,7 @@ public Map<TopicPartition, Long> initialize() {
}
initTopology();
processorContext.initialize();
+ this.flushState();
Review Comment:
```suggestion
flushState();
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java:
##########
@@ -252,6 +252,10 @@ public void shouldCheckpointOffsetsWhenStateIsFlushed() {
@Test
public void shouldNotCheckpointIfNotReceivedEnoughRecords() {
globalStateTask.initialize();
+ // Reset after initialization since checkpointing should happen during
initialization, KAFKA-18168
Review Comment:
```suggestion
// Reset after initialization since checkpointing should happen
during initialization
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java:
##########
@@ -333,4 +343,21 @@ public void shouldWipeGlobalStateDirectory() throws
Exception {
globalStateTask.close(true);
assertFalse(stateMgr.baseDir().exists());
}
+
+ @Test
+ public void shouldCheckpointDuringInitialization() {
+ globalStateTask.initialize();
+
+ assertTrue(stateMgr.checkpointWritten);
+ assertTrue(stateMgr.flushed);
+ }
+
+ @Test
+ public void shouldCheckpointDuringClose() throws Exception {
+ globalStateTask.initialize();
+ globalStateTask.close(false);
Review Comment:
Seems we need to reset both flags after `initialize()` but before `close()`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java:
##########
@@ -138,6 +139,7 @@ public void flushState() {
}
public void close(final boolean wipeStateStore) throws IOException {
+ this.flushState();
Review Comment:
```suggestion
flushState();
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java:
##########
@@ -138,6 +139,7 @@ public void flushState() {
}
public void close(final boolean wipeStateStore) throws IOException {
+ this.flushState();
Review Comment:
It seems to not make sense to flush the state if `wipeStateStore == true` ?
Not really a problem, as we are closing anyway, but it still feels like it's
off.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java:
##########
@@ -252,6 +252,10 @@ public void shouldCheckpointOffsetsWhenStateIsFlushed() {
@Test
public void shouldNotCheckpointIfNotReceivedEnoughRecords() {
globalStateTask.initialize();
+ // Reset after initialization since checkpointing should happen during
initialization, KAFKA-18168
+ stateMgr.checkpointWritten = false;
+ stateMgr.flushed = false;
Review Comment:
Nit: it might be slightly better, to do this a little later in the test, ie,
just before we call `globalStateTask.maybeCheckpoint()`? Test logic might be
easier to reads for humans as (1) reset flags, (2) checkpoint, (3) verify flags.
```
globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9000L,
"foo".getBytes(), "foo".getBytes()));
time.sleep(flushInterval); // flush interval elapsed
stateMgr.checkpointWritten = false;
stateMgr.flushed = false;
globalStateTask.maybeCheckpoint();
assertEquals(offsets, stateMgr.changelogOffsets());
assertFalse(stateMgr.flushed);
assertFalse(stateMgr.checkpointWritten);
```
Similar in other tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]