guozhangwang commented on a change in pull request #10894:
URL: https://github.com/apache/kafka/pull/10894#discussion_r654680551



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -262,34 +260,19 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
 
-            while (offset < highWatermark) { // when we "fix" this loop 
(KAFKA-7380 / KAFKA-10317)
-                                             // we should update the `poll()` 
timeout below
-
-                // we ignore `poll.ms` config during bootstrapping phase and
-                // apply `request.timeout.ms` plus `task.timeout.ms` instead
-                //
-                // the reason is, that `poll.ms` might be too short to give a 
fetch request a fair chance
-                // to actually complete and we don't want to start 
`task.timeout.ms` too early
-                //
-                // we also pass `task.timeout.ms` into `poll()` directly right 
now as it simplifies our own code:
-                // if we don't pass it in, we would just track the timeout 
ourselves and call `poll()` again
-                // in our own retry loop; by passing the timeout we can reuse 
the consumer's internal retry loop instead
-                //
-                // note that using `request.timeout.ms` provides a 
conservative upper bound for the timeout;
-                // this implies that we might start `task.timeout.ms` 
"delayed" -- however, starting the timeout
-                // delayed is preferable (as it's more robust) than starting 
it too early
+            while (offset < highWatermark) {
+                // we add `request.timeout.ms` to `poll.ms` because `poll.ms` 
might be too short
+                // to give a fetch request a fair chance to actually complete 
and we don't want to
+                // start `task.timeout.ms` too early
                 //
-                // TODO https://issues.apache.org/jira/browse/KAFKA-10315
-                //   -> do a more precise timeout handling if `poll` would 
throw an exception if a fetch request fails
-                //      (instead of letting the consumer retry fetch requests 
silently)
-                //
-                // TODO https://issues.apache.org/jira/browse/KAFKA-10317 and
-                //      https://issues.apache.org/jira/browse/KAFKA-7380
-                //  -> don't pass in `task.timeout.ms` to stay responsive if 
`KafkaStreams#close` gets called
-                final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(requestTimeoutPlusTaskTimeout);
+                // TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
+                //      `poll(pollMS)` without adding the request timeout and 
do a more precise
+                //      timeout handling
+                final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollMsPlusRequestTimeout);

Review comment:
       That looks reasonable to me -- as a hindsight it is too aggressive to 
put the task timeout time into poll directly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to