xiangfu0 commented on code in PR #18532:
URL: https://github.com/apache/pinot/pull/18532#discussion_r3268521825
##########
pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java:
##########
@@ -301,15 +322,114 @@ private long fetchExistingTotalDocs(String tableName)
return response.get(TOTAL_DOCS).asLong();
}
- private void waitForDocsLoaded(String tableName, long targetDocs, long
timeoutMs) {
+ private long waitForExistingTotalDocs(String tableName, long timeoutMs)
+ throws Exception {
+ AtomicLong loadedDocs = new AtomicLong(-1);
+ waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+ loadedDocs.set(existingTotalDocs);
+ return true;
+ }, () -> "Failed to fetch existing documents for table: " + tableName);
+ return loadedDocs.get();
+ }
+
+ private void waitForDocsLoaded(String tableName, long targetDocs, long
timeoutMs)
+ throws Exception {
LOGGER.info("Wait Doc to load ...");
AtomicLong loadedDocs = new AtomicLong(-1);
- TestUtils.waitForCondition(
- () -> {
- long existingTotalDocs = fetchExistingTotalDocs(tableName);
- loadedDocs.set(existingTotalDocs);
- return existingTotalDocs == targetDocs;
- }, 100L, timeoutMs, "Failed to load " + targetDocs + " documents.
Found " + loadedDocs.get() + " instead",
- Duration.ofSeconds(1));
+ waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+ loadedDocs.set(existingTotalDocs);
+ return existingTotalDocs == targetDocs;
+ }, () -> "Failed to load " + targetDocs + " documents. Found " +
loadedDocs.get() + " instead");
+ }
+
+ private void waitForDocs(String tableName, long timeoutMs, LongPredicate
condition, Supplier<String> errorMessage)
+ throws Exception {
+ waitForDocs(tableName, timeoutMs, COUNT_QUERY_RETRY_INTERVAL_MS,
condition, errorMessage,
+ () -> fetchExistingTotalDocs(tableName));
+ }
+
+ void waitForDocs(String tableName, long timeoutMs, long retryIntervalMs,
LongPredicate condition,
+ Supplier<String> errorMessage, TotalDocsSupplier totalDocsSupplier)
+ throws Exception {
+ long endTimeMs = System.currentTimeMillis() + timeoutMs;
+ long nextLogTimeMs = 0;
+ RetryableQueryException lastRetryableQueryException = null;
+ while (System.currentTimeMillis() < endTimeMs) {
+ try {
+ if (condition.test(totalDocsSupplier.getAsLong())) {
+ return;
+ }
+ } catch (RetryableQueryException e) {
+ lastRetryableQueryException = e;
+ long currentTimeMs = System.currentTimeMillis();
+ if (currentTimeMs >= nextLogTimeMs) {
+ LOGGER.warn("Unable to fetch total docs for table: {}. Trying
again", tableName, e);
+ nextLogTimeMs = currentTimeMs + Duration.ofSeconds(1).toMillis();
+ }
+ }
+ Thread.sleep(retryIntervalMs);
+ }
+ throw new RuntimeException(errorMessage.get(),
lastRetryableQueryException);
+ }
+
+ static boolean hasOnlyRetryableQueryExceptions(JsonNode exceptions) {
+ if (exceptions.isArray()) {
+ if (exceptions.isEmpty()) {
+ return false;
+ }
+ for (JsonNode exception : exceptions) {
+ if (!isRetryableQueryException(exception)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return isRetryableQueryException(exceptions);
+ }
+
+ private static boolean isRetryableQueryException(JsonNode exception) {
+ Integer errorCode = getErrorCode(exception);
+ return errorCode != null &&
RETRYABLE_QUERY_ERROR_CODES.contains(errorCode);
+ }
+
+ private static Integer getErrorCode(JsonNode exception) {
+ JsonNode errorCode = exception.get(ERROR_CODE);
+ if (errorCode != null) {
+ if (errorCode.canConvertToInt()) {
+ return errorCode.asInt();
+ }
+ if (errorCode.isTextual()) {
+ try {
+ return Integer.parseInt(errorCode.asText());
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+ }
+
+ JsonNode code = exception.get(CODE);
Review Comment:
Good catch. There is no new `code` contract here; broker query responses
should use `errorCode`. I removed the speculative `code` fallback and the
named-code test so the verifier now only classifies retryable query exceptions
from `errorCode`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]