This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5853-a5d8602b44f8297a15cf7800dde468d7d784b235 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 18e4e67c70fcc4d4c739338d9c30ce552db3decc Author: Matthew B. <[email protected]> AuthorDate: Mon Jun 22 10:13:37 2026 -0700 fix(file-service): retry S3 bucket creation on slow startup (#5853) ### What changes were proposed in this PR? - Add `awaitDependency` to `FileService`, an exponential-backoff retry (6 attempts from 200ms, ~6s total) with an injectable sleep, mirroring `LakeFSStorageClient.retryWithBackoff`. - Wrap the two `S3StorageClient.createBucketIfNotExist` calls in `FileService.run` with it, so a slow-to-start MinIO/S3 no longer aborts file-service startup. - Handle `InterruptedException` consistently: an interrupt arriving during the backoff `sleep` (not just during the bucket operation) now restores the thread's interrupt status and fails fast, instead of escaping as a raw `InterruptedException` with the interrupt flag lost. - Leave `LakeFSStorageClient.healthCheck()` on its existing inner retry (unchanged). - Add `FileServiceSpec` (8 tests) covering immediate success, default-argument success, retry-then-success, the full backoff progression to give-up, give-up preserving the cause, `maxAttempts == 1`, and interrupt-fails-fast for both interrupt points. ### Any related issues, documentation, discussions? Closes: #5852 Note: `awaitDependency` is a near-duplicate of `LakeFSStorageClient.retryWithBackoff` in `common/workflow-core`. Extracting a single shared helper that both delegate to is the cleaner end state, but it would refactor a stable, separately-tested class in another module, so it is deferred to a follow-up rather than widening the scope of this startup-race fix. ### How was this PR tested? - Run `sbt "FileService/testOnly org.apache.texera.service.FileServiceSpec"` and expect 8 passing tests: - immediate success runs the operation once and never sleeps; - default-argument success returns on the first try without invoking the default `Thread.sleep` backoff; - retry-then-success records delays `List(200, 400)` before succeeding on the 3rd try; - exhausting all 6 attempts records the full progression `List(200, 400, 800, 1600, 3200)` before giving up; - give-up rethrows after `maxAttempts` with the original exception as `getCause` and the dependency name in the message; - `maxAttempts == 1` gives up after a single attempt without sleeping; - an interrupt while running the operation restores the interrupt flag and fails fast; - an interrupt while sleeping between attempts likewise restores the interrupt flag and fails fast. - This environment hits a pre-existing JaCoCo instrumentation error (`Unsupported class file major version 69`) because JaCoCo 0.8.11 cannot instrument JDK 25 class files; this is unrelated to the change. The spec was verified locally against a JDK 17 toolchain (`sbt -java-home <jdk17>`, 8/8 pass) and relies on CI's JDK/JaCoCo combo for the standard instrumented run. `scalafmtCheck` is clean for both main and test sources. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF --- .../org/apache/texera/service/FileService.scala | 56 +++++- .../apache/texera/service/FileServiceSpec.scala | 189 +++++++++++++++++++++ 2 files changed, 243 insertions(+), 2 deletions(-) diff --git a/file-service/src/main/scala/org/apache/texera/service/FileService.scala b/file-service/src/main/scala/org/apache/texera/service/FileService.scala index 76d78dfef8..9a2688212d 100644 --- a/file-service/src/main/scala/org/apache/texera/service/FileService.scala +++ b/file-service/src/main/scala/org/apache/texera/service/FileService.scala @@ -76,9 +76,13 @@ class FileService extends Application[FileServiceConfiguration] with LazyLogging ) // check if the texera dataset bucket exists, if not create it - S3StorageClient.createBucketIfNotExist(StorageConfig.lakefsBucketName) + awaitDependency("texera dataset bucket") { + S3StorageClient.createBucketIfNotExist(StorageConfig.lakefsBucketName) + } // ensure the large-binary S3 bucket exists before any workflow execution attempts to use it - S3StorageClient.createBucketIfNotExist(LargeBinaryManager.DEFAULT_BUCKET) + awaitDependency("large-binary bucket") { + S3StorageClient.createBucketIfNotExist(LargeBinaryManager.DEFAULT_BUCKET) + } // check if we can connect to the lakeFS service LakeFSStorageClient.healthCheck() @@ -105,6 +109,54 @@ class FileService extends Application[FileServiceConfiguration] with LazyLogging // Route request logs through SLF4J, controlled by TEXERA_SERVICE_LOG_LEVEL RequestLoggingFilter.register(environment.getApplicationContext) } + + /** + * Runs `operation`, retrying with exponential backoff until it succeeds or `maxAttempts` is + * reached, to tolerate a slow-to-start object store. The last failure is rethrown as the cause. + * `sleep` is injectable for tests. Defaults: 6 attempts from 200ms (200, 400, 800, 1600, 3200), ~6s. + */ + private[service] def awaitDependency( + description: String, + maxAttempts: Int = 6, + initialDelayMillis: Long = 200L, + sleep: Long => Unit = Thread.sleep + )(operation: => Unit): Unit = { + // Restore the interrupt status and fail fast rather than retrying, whether the + // interrupt arrives while running `operation` or while sleeping between attempts. + def failInterrupted(ie: InterruptedException): Nothing = { + Thread.currentThread().interrupt() + throw new RuntimeException(s"Interrupted while waiting for $description", ie) + } + + var attempt = 1 + var delayMillis = initialDelayMillis + while (true) { + try { + operation + return + } catch { + case ie: InterruptedException => failInterrupted(ie) + case e: Exception => + if (attempt >= maxAttempts) { + throw new RuntimeException( + s"$description not ready after $maxAttempts attempts: ${e.getMessage}", + e + ) + } + logger.warn( + s"$description not ready (attempt $attempt/$maxAttempts): ${e.getMessage}. " + + s"Retrying in ${delayMillis}ms..." + ) + try { + sleep(delayMillis) + } catch { + case ie: InterruptedException => failInterrupted(ie) + } + attempt += 1 + delayMillis *= 2 + } + } + } } object FileService { diff --git a/file-service/src/test/scala/org/apache/texera/service/FileServiceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/FileServiceSpec.scala new file mode 100644 index 0000000000..30ec2c23f7 --- /dev/null +++ b/file-service/src/test/scala/org/apache/texera/service/FileServiceSpec.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service + +import org.scalatest.flatspec.AnyFlatSpec + +import scala.collection.mutable.ListBuffer + +class FileServiceSpec extends AnyFlatSpec { + + private val service = new FileService() + + "awaitDependency" should "run the operation once and not sleep when it succeeds immediately" in { + var attempts = 0 + val delays = ListBuffer.empty[Long] + service.awaitDependency("dep", 6, 200L, delays += _) { + attempts += 1 + } + assert(attempts == 1) + assert(delays.isEmpty) + } + + it should "run the operation once with the default arguments when it succeeds immediately" in { + // Exercises the default maxAttempts/initialDelay/sleep parameters: a first-try success + // returns without ever invoking the (real Thread.sleep) default backoff. + var attempts = 0 + service.awaitDependency("dep") { + attempts += 1 + } + assert(attempts == 1) + } + + it should "retry until success and double the delay after each failed attempt" in { + var attempts = 0 + val delays = ListBuffer.empty[Long] + service.awaitDependency("dep", 6, 200L, delays += _) { + attempts += 1 + if (attempts < 3) throw new RuntimeException("not reachable yet") + } + assert(attempts == 3) + assert(delays.toList == List(200L, 400L)) + } + + it should "double the delay after every failed attempt up to maxAttempts - 1 sleeps" in { + var attempts = 0 + val delays = ListBuffer.empty[Long] + val ex = intercept[RuntimeException] { + service.awaitDependency("dep", 6, 200L, delays += _) { + attempts += 1 + throw new RuntimeException("down") + } + } + // 6 attempts means 5 backoff waits following the geometric progression from 200ms. + assert(attempts == 6) + assert(delays.toList == List(200L, 400L, 800L, 1600L, 3200L)) + assert(ex.getMessage.contains("after 6 attempts")) + } + + it should "give up after maxAttempts and preserve the last failure as the cause" in { + var attempts = 0 + val cause = new RuntimeException("still down") + val ex = intercept[RuntimeException] { + service.awaitDependency("dep", 3, 200L, _ => ()) { + attempts += 1 + throw cause + } + } + assert(attempts == 3) + assert(ex.getMessage.contains("after 3 attempts")) + assert(ex.getMessage.contains("dep")) + assert(ex.getCause eq cause) + } + + it should "give up immediately without sleeping when maxAttempts is 1" in { + var attempts = 0 + val delays = ListBuffer.empty[Long] + val cause = new RuntimeException("still down") + val ex = intercept[RuntimeException] { + service.awaitDependency("dep", 1, 200L, delays += _) { + attempts += 1 + throw cause + } + } + assert(attempts == 1) + assert(delays.isEmpty) + assert(ex.getMessage.contains("after 1 attempts")) + assert(ex.getCause eq cause) + } + + it should "fail fast and restore the interrupt status when the operation is interrupted" in { + val ex = intercept[RuntimeException] { + service.awaitDependency("dep", 6, 200L, _ => ()) { + throw new InterruptedException("interrupted") + } + } + // Thread.interrupted() both reads and clears the flag, so the interrupt was restored. + assert(Thread.interrupted()) + assert(ex.getMessage.contains("Interrupted while waiting for dep")) + assert(ex.getCause.isInstanceOf[InterruptedException]) + } + + it should "fail fast and restore the interrupt status when interrupted while sleeping between attempts" in { + var attempts = 0 + val ex = intercept[RuntimeException] { + service.awaitDependency("dep", 6, 200L, _ => throw new InterruptedException("interrupted")) { + attempts += 1 + throw new RuntimeException("not reachable yet") + } + } + // The operation failed once, then the interrupt arrived during the backoff sleep. + assert(attempts == 1) + // Thread.interrupted() both reads and clears the flag, so the interrupt was restored. + assert(Thread.interrupted()) + assert(ex.getMessage.contains("Interrupted while waiting for dep")) + assert(ex.getCause.isInstanceOf[InterruptedException]) + } + + it should "succeed on the final allowed attempt without giving up one try too early" in { + // Boundary for `attempt >= maxAttempts`: the operation only succeeds on the very last + // attempt, so the loop must not give up prematurely. Expect maxAttempts - 1 backoff waits. + var attempts = 0 + val delays = ListBuffer.empty[Long] + service.awaitDependency("dep", 3, 200L, delays += _) { + attempts += 1 + if (attempts < 3) throw new RuntimeException("not reachable yet") + } + assert(attempts == 3) + assert(delays.toList == List(200L, 400L)) + } + + it should "honor a custom initial delay when computing the backoff progression" in { + // Guards against the initial delay being hardcoded: starting from 50ms the geometric + // progression must be 50, 100, 200 rather than the default 200-based sequence. + var attempts = 0 + val delays = ListBuffer.empty[Long] + val ex = intercept[RuntimeException] { + service.awaitDependency("dep", 4, 50L, delays += _) { + attempts += 1 + throw new RuntimeException("down") + } + } + assert(attempts == 4) + assert(delays.toList == List(50L, 100L, 200L)) + assert(ex.getMessage.contains("after 4 attempts")) + } + + it should "include the underlying failure message when giving up" in { + val ex = intercept[RuntimeException] { + service.awaitDependency("dataset bucket", 2, 200L, _ => ()) { + throw new RuntimeException("connection refused") + } + } + assert(ex.getMessage.contains("dataset bucket not ready after 2 attempts")) + assert(ex.getMessage.contains("connection refused")) + } + + it should "propagate a non-Exception Throwable immediately without retrying or wrapping it" in { + // The catch clause only matches Exception, so an Error must escape on the first attempt: + // it is neither retried nor wrapped in the \"not ready after N attempts\" RuntimeException. + var attempts = 0 + val delays = ListBuffer.empty[Long] + val err = intercept[StackOverflowError] { + service.awaitDependency("dep", 6, 200L, delays += _) { + attempts += 1 + throw new StackOverflowError("boom") + } + } + assert(attempts == 1) + assert(delays.isEmpty) + assert(err.getMessage == "boom") + } +}
