mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460449901



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                               final RecordConverter recordConverter) {
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
+            long offset;
             final Long checkpoint = checkpointFileCache.get(topicPartition);
             if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
+                offset = checkpoint;
             } else {
                 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+                final AtomicLong position = new AtomicLong();
+                retryUntilSuccessOrThrowOnTaskTimeout(
+                    () -> 
position.set(globalConsumer.position(topicPartition)),
+                    String.format(
+                        "Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+                        topicPartition
+                    )
+                );
+                offset = position.get();
             }
 
-            long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
             final RecordBatchingStateRestoreCallback stateRestoreAdapter =
                 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
 
+            long deadlineMs = NO_DEADLINE;
             while (offset < highWatermark) {
                 try {
                     final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
+                    if (records.isEmpty()) {
+                        if (taskTimeoutMs == 0L) {
+                            deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
       I agree. The "issue" is really that `poll()` does _not_ throw a 
`TimeoutException`... Also, because we do manual assignment, `poll()` would 
never return "early" as it never need to wait for joining a consumer group. -- 
However, compare to `max.task.idle.ms`, we are in a better situation here, 
because we poll() for only a single partition at a time.
   
   I also agree, that applying `task.timeout.ms` should start _after_ we got a 
first timeout -- this was how the original code worked that you criticized as:
   > Man, this is confusing.
   
   And I agree, that the code was not straightforward to understand. But if we 
think it's the right thing to do, I am also happy to add it back :)
   
   I am also not an expert on all consumer internals, but from my 
understanding, fetch requests are send async in general, and if a fetch request 
fails, the consumer would actually not retry it but a retry would be triggered 
by the next `poll()` call. If there is no data available (ie, fetch request did 
not return yet) when `poll()` is called, the consumer would block internally 
until `poll(Duration)` timeout expires or until a fetch request returns 
(whatever comes first).
   
   Furthermore, before `poll()` returns, it always check if a fetch request is 
in-flight or not, and sends one if not.
   
   Thus, on the verify first call to `poll()` we know that no fetch request can 
be in-flight and we also know that `poll()` would send one, and block until it 
returns or `poll(Duration)` expires. Thus, if `poll()` does not block for at 
least `request.timeout.ms`, and we get an empty result back we don't know which 
case holds, however, if we use the request timeout, it seems that we _know_ if 
the fetch was successful or did time out? We also know, that a fetch request 
will be inflight after `poll()` returns. Thus, for any consecutive `poll()` 
applying request timeout also ensures that we know if the request was 
successful or not.
   
   I guess the only difference to what I just described to my original code 
was, that I uses `pollTime + requestTimeout`.
   
   Bottom line: I am not 100% sure what you propose? Should we go with the 
original design? Or with the new design? -- In the end, I think we don't need a 
follow up PR, and we can just try to get it right in this PR. I don't see any 
benefit in splitting it up into 2 PRs (because, as mentioned above, we fetch 
for a single partitions and thus it's a different case compared to 
`max.task.idle.ms` scenario).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to