Technoboy- commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1109279281


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -301,6 +304,34 @@ private <T> void phaseOneLoop(Reader<T> reader, 
CompletableFuture<PhaseOneResult
 
     }
 
+    private <T> void waitForReconnection(Reader<T> reader) {
+        long started = System.currentTimeMillis();
+
+        // initial sleep
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+        }
+        while (!reader.isConnected()) {
+            long now = System.currentTimeMillis();
+            if (now - started > MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS) {
+                String errorMsg = String.format(
+                        "Reader has not been reconnected for %d secs. Stopping 
the compaction.",
+                        MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS / 1000);
+                log.error(errorMsg);
+                throw new RuntimeException(errorMsg);
+            }
+            log.warn(
+                    "Reader has not been reconnected after the cursor reset. 
elapsed :{} ms. Retrying "
+                            + "soon.", now - started);
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                log.warn("The thread got interrupted while waiting. 
continuing", e);
+            }
+        }
+    }
+

Review Comment:
   Why do we add `waitForReconnection`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to