eduwercamacaro commented on code in PR #20749:
URL: https://github.com/apache/kafka/pull/20749#discussion_r2682836819


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -230,44 +238,23 @@ public void initializeStartupTasks(final TopologyMetadata 
topologyMetadata,
                             .map(t -> new TopicPartition(t, id.partition()))
                             .collect(Collectors.toSet());
                     final ProcessorStateManager stateManager = 
ProcessorStateManager.createStartupTaskStateManager(
-                        id,
-                        eosEnabled,
-                        logContext,
-                        this,
-                        subTopology.storeToChangelogTopic(),
-                        inputPartitions,
-                        stateUpdaterEnabled
+                            id,
+                            eosEnabled,
+                            logContext,
+                            this,
+                            subTopology.storeToChangelogTopic(),
+                            inputPartitions,
+                            stateUpdaterEnabled
                     );
-
-                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
-                        id,
-                        config,
-                        stateManager,
-                        streamsMetrics,
-                        dummyCache
-                    );
-
-                    final Task task = new StandbyTask(
-                        id,
-                        inputPartitions,
-                        subTopology,
-                        topologyMetadata.taskConfig(id),
-                        streamsMetrics,
-                        stateManager,
-                        this,
-                        dummyCache,
-                        context
-                    );
-
-                    try {
-                        task.initializeIfNeeded();
-
-                        tasksForLocalState.put(id, task);
-                    } catch (final TaskCorruptedException e) {
-                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
-                        task.suspend();
-                        task.closeDirty();
+                    final StartupContext initContext = new StartupContext(id, 
config, stateManager);
+                    // TODO: we need to pass a proper logPrefix

Review Comment:
   I think we need to use a different format for the logPrefix since this 
method is not being invoked by a StreamThread. Just pushed [this commit 
](https://github.com/apache/kafka/pull/20749/commits/f8febe71461adae7288943382a27ba9f91436a46)let
 me know what you think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to