Nikita-Shupletsov commented on code in PR #20749:
URL: https://github.com/apache/kafka/pull/20749#discussion_r2699551327


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -221,44 +228,23 @@ public void initializeStartupTasks(final TopologyMetadata 
topologyMetadata,
                             .map(t -> new TopicPartition(t, id.partition()))
                             .collect(Collectors.toSet());
                     final ProcessorStateManager stateManager = 
ProcessorStateManager.createStartupTaskStateManager(
-                        id,
-                        eosEnabled,
-                        logContext,
-                        this,
-                        subTopology.storeToChangelogTopic(),
-                        inputPartitions,
-                        stateUpdaterEnabled
+                            id,
+                            eosEnabled,
+                            logContext,
+                            this,
+                            subTopology.storeToChangelogTopic(),
+                            inputPartitions,
+                            stateUpdaterEnabled
                     );
-
-                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
-                        id,
-                        config,
-                        stateManager,
-                        streamsMetrics,
-                        dummyCache
-                    );
-
-                    final Task task = new StandbyTask(
-                        id,
-                        inputPartitions,
-                        subTopology,
-                        topologyMetadata.taskConfig(id),
-                        streamsMetrics,
-                        stateManager,
-                        this,
-                        dummyCache,
-                        context
-                    );
-
-                    try {
-                        task.initializeIfNeeded();
-
-                        tasksForLocalState.put(id, task);
-                    } catch (final TaskCorruptedException e) {
-                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
-                        task.suspend();
-                        task.closeDirty();
+                    final StartupContext initContext = new StartupContext(id, 
config, stateManager);
+                    // TODO: we need to pass a proper logPrefix
+                    StateManagerUtil.registerStateStores(log, "", subTopology, 
stateManager, this, initContext);
+                    for (final StateStore stateStore : 
subTopology.stateStores()) {
+                        if (!stateStore.isOpen()) {

Review Comment:
   @eduwercamacaro 
   thanks!
   just to confirm: the idea is that we want the owner of a store to be 
responsible for its proper initialization, right? so if KS hasn't called 
preInit, they will need to do that themselves during init



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to