Nikita-Shupletsov commented on code in PR #20749:
URL: https://github.com/apache/kafka/pull/20749#discussion_r2453535667


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -221,44 +228,23 @@ public void initializeStartupTasks(final TopologyMetadata 
topologyMetadata,
                             .map(t -> new TopicPartition(t, id.partition()))
                             .collect(Collectors.toSet());
                     final ProcessorStateManager stateManager = 
ProcessorStateManager.createStartupTaskStateManager(
-                        id,
-                        eosEnabled,
-                        logContext,
-                        this,
-                        subTopology.storeToChangelogTopic(),
-                        inputPartitions,
-                        stateUpdaterEnabled
+                            id,
+                            eosEnabled,
+                            logContext,
+                            this,
+                            subTopology.storeToChangelogTopic(),
+                            inputPartitions,
+                            stateUpdaterEnabled
                     );
-
-                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
-                        id,
-                        config,
-                        stateManager,
-                        streamsMetrics,
-                        dummyCache
-                    );
-
-                    final Task task = new StandbyTask(
-                        id,
-                        inputPartitions,
-                        subTopology,
-                        topologyMetadata.taskConfig(id),
-                        streamsMetrics,
-                        stateManager,
-                        this,
-                        dummyCache,
-                        context
-                    );
-
-                    try {
-                        task.initializeIfNeeded();
-
-                        tasksForLocalState.put(id, task);
-                    } catch (final TaskCorruptedException e) {
-                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
-                        task.suspend();
-                        task.closeDirty();
+                    final StartupContext initContext = new StartupContext(id, 
config, stateManager);
+                    // TODO: we need to pass a proper logPrefix
+                    StateManagerUtil.registerStateStores(log, "", subTopology, 
stateManager, this, initContext);
+                    for (final StateStore stateStore : 
subTopology.stateStores()) {
+                        if (!stateStore.isOpen()) {

Review Comment:
   I assume, the rest of stores will be updated later? right now there is only 
one Store is updated: RockDBStore. I opened a random 
store(InMemoryKeyValueStore) and it sets the open field to true in the init 
method, so it will start failing here, I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to