guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443849945



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch 
since transaction was aborted")) {

Review comment:
       1. If we are calling from `closeDirty`, then exceptions from 
`recordCollector.close` would be swallowed and logged.
   
   2. If we are calling from `closeAndRecycleState`, then the upper caller from 
TaskManager would capture all RE and move the task to toCloseDirty.
   
   3. The only thing that needs to be careful is calling from `closeClean` as 
Matthias pointed as an example above.
   
   So looking from that side, to keep the error logged down maybe we should 
still always checkForException in `RecordCollector.close` while at the same 
time keep this hacky error message handling inside RC.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to