coltmcnealy-lh commented on code in PR #20833:
URL: https://github.com/apache/kafka/pull/20833#discussion_r2525646040


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final 
int receivedAssignmentMet
 
     @Override
     public void onAssignment(final Assignment assignment, final 
ConsumerGroupMetadata metadata) {
+        final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
+            .values()
+            .stream()
+            .filter(t -> t.commitNeeded())

Review Comment:
   @mjsax and @lianetm thanks for the comments here. I did test this in our 
soak test, and the `commit()` did not fail. In fact it did help—deploying the 
fix allowed the soak to go from an unrecoverable state to finish restoration 
and be healthy again.
   
   My reading of `StreamThread.java` led me to the same conclusion that Lianet 
came to, which is that the exception would bubble up to the call to `poll()`.
   
   The reasoning for this is that `pollPhase()` is inside 
`runOnceWithProcessingThreads()` which is inside all of the exception handling 
in `runLoop()`.
   
   I see what Matthias is saying: failing to do further exception handling will 
cause the `StreamThread` to crash. But if a commit fails, we have to close all 
tasks dirty anyways, which is extremely disruptive in EOS already; does losing 
and recreating a thread (couple hundred milliseconds perhaps) matter compared 
to wiping 100's of GB's of RocksDB state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to