vvcephei commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r569575143



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
             if (recordInfo.queue().size() == maxBufferedSize) {
                 mainConsumer.resume(singleton(partition));
             }
-        } catch (final StreamsException e) {
-            throw e;
+
+            record = null;
+        } catch (final TimeoutException timeoutException) {
+            if (!eosEnabled) {
+                throw timeoutException;

Review comment:
       Thanks, all, sorry for the delay. I meant "crash" as in, crash the 
thread. This is what happens today, right? We throw the TimeoutException, 
convert it to a StreamsException, and throw it up to the top level, killing the 
thread?
   
   What I am thinking is that it's silly to just crash a thread when it has 
gotten a timeout in `poll()` or in checking `partitions()` because we can 
harmlessly retry those operations. Along those lines, retrying `commit` should 
be safe as well.
   
   But it looks like this PR is not retrying just `commit`, but is retrying the 
entire processing of a record. Is that a wrong conclusion?
   
   Retrying "process a record" is definitely not "safe" or "harmless" it will 
absolutely result in producing incorrect results, therefore we should not do 
it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to