guozhangwang commented on a change in pull request #8900: URL: https://github.com/apache/kafka/pull/8900#discussion_r443849945
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ########## @@ -267,7 +283,17 @@ public void close() { private void checkForException() { if (sendException != null) { - throw sendException; + if (sendException.getCause() instanceof KafkaException + && sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) { Review comment: 1. If we are calling from `closeDirty`, then exceptions from `recordCollector.close` would be swallowed and logged. 2. If we are calling from `closeAndRecycleState`, then the upper caller from TaskManager would capture all RE and move the task to toCloseDirty. 3. The only thing that needs to be careful is calling from `closeClean` as Matthias pointed as an example above. So looking from that side, to keep the error logged down maybe we should still always checkForException in `RecordCollector.close` while at the same time keep this hacky error message handling inside RC. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org