mjsax commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r569634714



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
             if (recordInfo.queue().size() == maxBufferedSize) {
                 mainConsumer.resume(singleton(partition));
             }
-        } catch (final StreamsException e) {
-            throw e;
+
+            record = null;
+        } catch (final TimeoutException timeoutException) {
+            if (!eosEnabled) {
+                throw timeoutException;

Review comment:
       > But it looks like this PR is not retrying just commit, but is retrying 
the entire processing of a record. Is that a wrong conclusion?
   >
   > Retrying "process a record" is definitely not "safe" or "harmless" it will 
absolutely result in producing incorrect results, therefore we should not do it.
   
   Well, overall I agree, but if we crash the thread with at-least-once, a we 
rebalance, the other thread will reprocess based on the last committed offset, 
so the net result is, that a rebalance might reprocess even _more_ records -- 
if we handle the timeout, we only try to reprocess a single record.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to