bbejeck commented on code in PR #20749:
URL: https://github.com/apache/kafka/pull/20749#discussion_r2770932903


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -237,35 +252,18 @@ public void initializeStartupTasks(final TopologyMetadata 
topologyMetadata,
                         inputPartitions
                     );
 
-                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
-                        id,
-                        config,
-                        stateManager,
-                        streamsMetrics,
-                        dummyCache
-                    );
-
-                    final Task task = new StandbyTask(
-                        id,
-                        inputPartitions,
-                        subTopology,
-                        topologyMetadata.taskConfig(id),
-                        streamsMetrics,
-                        stateManager,
-                        this,
-                        dummyCache,
-                        context
-                    );
-
+                    final StartupContext initContext = new StartupContext(id, 
config, temporaryStateManager, metricsImpl, cache);
                     try {
-                        task.initializeIfNeeded();
-
-                        tasksForLocalState.put(id, task);
-                    } catch (final TaskCorruptedException e) {
-                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
-                        task.suspend();
-                        task.closeDirty();
+                        StateManagerUtil.registerStateStores(log, 
threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
+                    } catch (final TaskCorruptedException tce) {
+                        log.warn("Failed to register startup state stores for 
task {}: {}", id, tce.getMessage());

Review Comment:
   `registerStateStores` also can throw a `StreamsException`, `LockException` , 
or  a `ProcessorStateException` should we handle these exceptions as well? If 
not add some comments on the reasoning



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -237,35 +252,18 @@ public void initializeStartupTasks(final TopologyMetadata 
topologyMetadata,
                         inputPartitions
                     );
 
-                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
-                        id,
-                        config,
-                        stateManager,
-                        streamsMetrics,
-                        dummyCache
-                    );
-
-                    final Task task = new StandbyTask(
-                        id,
-                        inputPartitions,
-                        subTopology,
-                        topologyMetadata.taskConfig(id),
-                        streamsMetrics,
-                        stateManager,
-                        this,
-                        dummyCache,
-                        context
-                    );
-
+                    final StartupContext initContext = new StartupContext(id, 
config, temporaryStateManager, metricsImpl, cache);
                     try {
-                        task.initializeIfNeeded();
-
-                        tasksForLocalState.put(id, task);
-                    } catch (final TaskCorruptedException e) {
-                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
-                        task.suspend();
-                        task.closeDirty();
+                        StateManagerUtil.registerStateStores(log, 
threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
+                    } catch (final TaskCorruptedException tce) {
+                        log.warn("Failed to register startup state stores for 
task {}: {}", id, tce.getMessage());
+                    } finally {
+                        // Make sure the state manager writes the local 
checkpoint file before closing the stores
+                        // This will be replaced in the future when removing 
the checkpoint file dependency.
+                        temporaryStateManager.checkpoint();

Review Comment:
   Same as above `temporaryStateManager.checkpoint()` can throw a 
`ProcessorStateException` but we don't handle it here, meaning `close()` is not 
called.  So do we consider this a fatal exception and allow it to bubble up and 
kill the main thread? If not, I'm guessing we'll need to catch this exception 
and do something.  Either way let's add a couple of comments on the handling



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -237,35 +252,18 @@ public void initializeStartupTasks(final TopologyMetadata 
topologyMetadata,
                         inputPartitions
                     );
 
-                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
-                        id,
-                        config,
-                        stateManager,
-                        streamsMetrics,
-                        dummyCache
-                    );
-
-                    final Task task = new StandbyTask(
-                        id,
-                        inputPartitions,
-                        subTopology,
-                        topologyMetadata.taskConfig(id),
-                        streamsMetrics,
-                        stateManager,
-                        this,
-                        dummyCache,
-                        context
-                    );
-
+                    final StartupContext initContext = new StartupContext(id, 
config, temporaryStateManager, metricsImpl, cache);
                     try {
-                        task.initializeIfNeeded();
-
-                        tasksForLocalState.put(id, task);
-                    } catch (final TaskCorruptedException e) {
-                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
-                        task.suspend();
-                        task.closeDirty();
+                        StateManagerUtil.registerStateStores(log, 
threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
+                    } catch (final TaskCorruptedException tce) {
+                        log.warn("Failed to register startup state stores for 
task {}: {}", id, tce.getMessage());
+                    } finally {
+                        // Make sure the state manager writes the local 
checkpoint file before closing the stores
+                        // This will be replaced in the future when removing 
the checkpoint file dependency.
+                        temporaryStateManager.checkpoint();
+                        temporaryStateManager.close();

Review Comment:
   I think we also need to add `StateDirectory.unlock` here  too.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -237,35 +252,18 @@ public void initializeStartupTasks(final TopologyMetadata 
topologyMetadata,
                         inputPartitions
                     );
 
-                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
-                        id,
-                        config,
-                        stateManager,
-                        streamsMetrics,
-                        dummyCache
-                    );
-
-                    final Task task = new StandbyTask(
-                        id,
-                        inputPartitions,
-                        subTopology,
-                        topologyMetadata.taskConfig(id),
-                        streamsMetrics,
-                        stateManager,
-                        this,
-                        dummyCache,
-                        context
-                    );
-
+                    final StartupContext initContext = new StartupContext(id, 
config, temporaryStateManager, metricsImpl, cache);
                     try {
-                        task.initializeIfNeeded();
-
-                        tasksForLocalState.put(id, task);
-                    } catch (final TaskCorruptedException e) {
-                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
-                        task.suspend();
-                        task.closeDirty();
+                        StateManagerUtil.registerStateStores(log, 
threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
+                    } catch (final TaskCorruptedException tce) {
+                        log.warn("Failed to register startup state stores for 
task {}: {}", id, tce.getMessage());
+                    } finally {
+                        // Make sure the state manager writes the local 
checkpoint file before closing the stores
+                        // This will be replaced in the future when removing 
the checkpoint file dependency.
+                        temporaryStateManager.checkpoint();
+                        temporaryStateManager.close();
                     }
+                    tasksForLocalState.put(id, new StartupState(id, 
subTopology, temporaryStateManager));

Review Comment:
   `temporaryStateManager` is closed above  line 264 but it's passed here to 
`StartupState`.
   It's never accessed from `StartupState` .  With the new work is 
`StartupState` required or can we get away with `Set<TaskId>` ? Or is it going 
to be needed for subsequent work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to