heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1110428037
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -301,6 +304,34 @@ private <T> void phaseOneLoop(Reader<T> reader,
CompletableFuture<PhaseOneResult
}
+ private <T> void waitForReconnection(Reader<T> reader) {
+ long started = System.currentTimeMillis();
+
+ // initial sleep
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ while (!reader.isConnected()) {
+ long now = System.currentTimeMillis();
+ if (now - started > MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS) {
+ String errorMsg = String.format(
+ "Reader has not been reconnected for %d secs. Stopping
the compaction.",
+ MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS / 1000);
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ log.warn(
+ "Reader has not been reconnected after the cursor reset.
elapsed :{} ms. Retrying "
+ + "soon.", now - started);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ log.warn("The thread got interrupted while waiting.
continuing", e);
+ }
+ }
+ }
+
Review Comment:
When `seek()` resets the cursor, this reader will be temporarily
disconnected.
Then, when calling `acknowledgeCumulativeAsync()` at the end of the
compaction(below code), the reader might throw an exception because state ==
Connecting. This issue could likely happen if there is only one message to
compact.
```
.thenCompose(v -> {
log.info("Acking ledger id {}", phaseOneResult.firstId);
return ((CompactionReaderImpl<T>) reader)
.acknowledgeCumulativeAsync(
phaseOneResult.lastId,
Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,
ledger.getId()));
})
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]