vvcephei commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r450376026



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 if (oldTask.isActive()) {
                     final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
                     newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
                     oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already

Review comment:
       Hi all, thanks for the discussion.
   
   To start at the beginning, yes, I was advocating for throwing an 
IllegalStateException at the earliest possible moment when we can detect the 
illegal state. Right here in the code, we are making an invalid assumption. 
Namely, that if a task to be recycled is active, then it has already been 
suspended and committed, and if it is a standby, then it still needs to be 
suspended and committed. Why should this be true? Because some other code deep 
inside another set of nested conditionals thirty lines above looks like it does 
that right now? Experience says that this situation will not survive 
refactoring. We cannot test every branch, so we need to make assertions about 
the state being valid when it's in doubt.
   
   We could make an argument that if this assumption becomes incorrect, than 
we'll throw an exception later on before any state becomes corrupted, which 
would be good. We could make a stronger argument that the exception we throw 
later on will be perfectly crystal clear about the cause and therefore we won't 
spend weeks poking around a flaky test or a user bug report trying to figure 
out what happened. But both of those arguments would depend on even further 
assumptions about stuff that may or may not happen elsewhere in the code base. 
The best thing to do at all times is validate potentially dangerous 
assumptions. This looks very much like a potentially dangerous assumption. I'm 
asking that we validate it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to