This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dcf72d2517 [FLINK-38507][tests] Fix the failed 
OpenTelemetryTraceReporterITCase (#27792)
5dcf72d2517 is described below

commit 5dcf72d251798ef09a157f118e38b12fc37a579e
Author: HundalTaran <[email protected]>
AuthorDate: Wed Apr 8 11:47:45 2026 +0530

    [FLINK-38507][tests] Fix the failed OpenTelemetryTraceReporterITCase 
(#27792)
---
 .../flink/metrics/otel/OpenTelemetryTestBase.java  |   7 +-
 .../testutils/RetryingTestContainerExtension.java  | 131 +++++++++++++++++++++
 2 files changed, 135 insertions(+), 3 deletions(-)

diff --git 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
index accd21a8acd..7fba6b23e6a 100644
--- 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
+++ 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.metrics.otel;
 
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.core.testutils.AllCallbackWrapper;
-import org.apache.flink.core.testutils.TestContainerExtension;
+import org.apache.flink.core.testutils.RetryingTestContainerExtension;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -64,10 +64,11 @@ public class OpenTelemetryTestBase {
 
     @RegisterExtension
     @Order(1)
-    private static final 
AllCallbackWrapper<TestContainerExtension<OtelTestContainer>>
+    private static final 
AllCallbackWrapper<RetryingTestContainerExtension<OtelTestContainer>>
             OTEL_EXTENSION =
                     new AllCallbackWrapper<>(
-                            new TestContainerExtension<>(() -> new 
OtelTestContainer(outputDir)));
+                            new RetryingTestContainerExtension<>(
+                                    () -> new OtelTestContainer(outputDir)));
 
     @BeforeEach
     public void setup() {
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java
new file mode 100755
index 00000000000..0374aecb4bc
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import com.github.dockerjava.api.command.PullImageResultCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.GenericContainer;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * A {@link CustomExtension} that manages a {@link GenericContainer} with 
retry logic, including
+ * re-pulling the Docker image on failure. This handles transient Docker image 
pull/build failures
+ * that can occur in CI environments.
+ *
+ * @param <T> The {@link GenericContainer} that shall be managed.
+ */
+public class RetryingTestContainerExtension<T extends GenericContainer<T>>
+        implements CustomExtension {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RetryingTestContainerExtension.class);
+    private static final int DEFAULT_MAX_RETRIES = 3;
+    private static final long DEFAULT_RETRY_DELAY_MS = 2000;
+    private static final long IMAGE_PULL_TIMEOUT_MINUTES = 2;
+
+    @Nullable private T testContainer;
+
+    private final Supplier<T> testContainerCreator;
+    private final int maxRetries;
+    private final long retryDelayMs;
+
+    public RetryingTestContainerExtension(Supplier<T> testContainerCreator) {
+        this(testContainerCreator, DEFAULT_MAX_RETRIES, 
DEFAULT_RETRY_DELAY_MS);
+    }
+
+    public RetryingTestContainerExtension(
+            Supplier<T> testContainerCreator, int maxRetries, long 
retryDelayMs) {
+        this.testContainerCreator = testContainerCreator;
+        this.maxRetries = maxRetries;
+        this.retryDelayMs = retryDelayMs;
+    }
+
+    public T getTestContainer() {
+        assert testContainer != null;
+        return testContainer;
+    }
+
+    private void terminateTestContainer() {
+        if (testContainer != null) {
+            testContainer.stop();
+            testContainer = null;
+        }
+    }
+
+    private void instantiateTestContainer() {
+        assert testContainer == null;
+        for (int attempt = 1; attempt <= maxRetries; attempt++) {
+            try {
+                testContainer = testContainerCreator.get();
+                testContainer.start();
+                return;
+            } catch (Exception e) {
+                LOG.warn(
+                        "Container start attempt {}/{} failed: {}",
+                        attempt,
+                        maxRetries,
+                        e.getMessage());
+                testContainer = null;
+                if (attempt == maxRetries) {
+                    throw e;
+                }
+                pullImage();
+                try {
+                    Thread.sleep(retryDelayMs);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException("Interrupted during container 
start retry", ie);
+                }
+            }
+        }
+    }
+
+    private void pullImage() {
+        try {
+            T tempContainer = testContainerCreator.get();
+            String imageName = tempContainer.getDockerImageName();
+            LOG.info("Re-pulling image {} before retry...", imageName);
+            DockerClientFactory.instance()
+                    .client()
+                    .pullImageCmd(imageName)
+                    .exec(new PullImageResultCallback())
+                    .awaitCompletion(IMAGE_PULL_TIMEOUT_MINUTES, 
TimeUnit.MINUTES);
+            LOG.info("Image {} pulled successfully", imageName);
+        } catch (Exception e) {
+            LOG.warn("Failed to pull image: {}", e.getMessage());
+        }
+    }
+
+    @Override
+    public void after(ExtensionContext context) throws Exception {
+        terminateTestContainer();
+    }
+
+    @Override
+    public void before(ExtensionContext context) throws Exception {
+        terminateTestContainer();
+        instantiateTestContainer();
+    }
+}

Reply via email to