vvcephei commented on a change in pull request #8962: URL: https://github.com/apache/kafka/pull/8962#discussion_r450376026
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, if (oldTask.isActive()) { final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + cleanUpTaskProducer(oldTask, taskCloseExceptions); } else { oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already Review comment: Hi all, thanks for the discussion. To start at the beginning, yes, I was advocating for throwing an IllegalStateException at the earliest possible moment when we can detect the illegal state. Right here in the code, we are making an invalid assumption. Namely, that if a task to be recycled is active, then it has already been suspended and committed, and if it is a standby, then it still needs to be suspended and committed. Why should this be true? Because some other code deep inside another set of nested conditionals thirty lines above looks like it does that right now? Experience says that this situation will not survive refactoring. We cannot test every branch, so we need to make assertions about the state being valid when it's in doubt. We could make an argument that if this assumption becomes incorrect, than we'll throw an exception later on before any state becomes corrupted, which would be good. We could make a stronger argument that the exception we throw later on will be perfectly crystal clear about the cause and therefore we won't spend weeks poking around a flaky test or a user bug report trying to figure out what happened. But both of those arguments would depend on even further assumptions about stuff that may or may not happen elsewhere in the code base. The best thing to do at all times is validate potentially dangerous assumptions. This looks very much like a potentially dangerous assumption. I'm asking that we validate it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org