This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a6af80198f2 [fix][test] Fix flaky 
PulsarFunctionTlsTest.testFunctionsCreation() test (#25889)
a6af80198f2 is described below

commit a6af80198f26d31e991e880db3f9b7d601a579c2
Author: Oneby Wang <[email protected]>
AuthorDate: Sat May 30 16:16:16 2026 +0800

    [fix][test] Fix flaky PulsarFunctionTlsTest.testFunctionsCreation() test 
(#25889)
---
 .../functions/worker/PulsarFunctionTlsTest.java    | 37 +++++++++++-----------
 1 file changed, 18 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 9727dfb8f41..3d2e5f29886 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -19,8 +19,6 @@
 package org.apache.pulsar.functions.worker;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
@@ -41,10 +39,10 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -259,23 +257,16 @@ public class PulsarFunctionTlsTest {
 
             log.info().attr("function", functionName).log("Start test 
function");
 
-            int finalI = i;
-            // Wait for a leader to be ready and create the function.
-            // The createFunctionWithUrl call is included in the retry loop 
because a leadership
-            // transition can happen between the leader check and the actual 
API call, causing
-            // a 503 "Leader not yet ready" error.
             final PulsarAdmin createAdmin = pulsarAdmins[i];
-            Awaitility.await().atMost(1, TimeUnit.MINUTES).pollInterval(1, 
TimeUnit.SECONDS).untilAsserted(() -> {
-                final PulsarWorkerService workerService = 
((PulsarWorkerService) fnWorkerServices[finalI]);
-                final LeaderService leaderService = 
workerService.getLeaderService();
-                assertNotNull(leaderService);
-                if (!leaderService.isLeader()) {
-                    final WorkerInfo workerInfo = 
workerService.getMembershipManager().getLeader();
-                    assertTrue(workerInfo != null
-                            && 
!workerInfo.getWorkerId().equals(workerService.getWorkerConfig().getWorkerId()));
-                }
-                createAdmin.functions().createFunctionWithUrl(functionConfig, 
jarFilePathUrl);
-            });
+            // During function-worker leadership election/switchover, the 
coordination topic can already point to
+            // the new leader while that worker is still finishing its leader 
initialization. In that short window
+            // the internal /functions/leader request returns a transient 503 
"Leader not yet ready", so retry only
+            // that condition and let all other failures surface immediately.
+            Awaitility.await().atMost(1, TimeUnit.MINUTES)
+                    .pollInterval(1, TimeUnit.SECONDS)
+                    
.ignoreExceptionsMatching(PulsarFunctionTlsTest::isLeaderNotReady)
+                    .untilAsserted(() -> createAdmin.functions()
+                            .createFunctionWithUrl(functionConfig, 
jarFilePathUrl));
 
             // Function creation is not strongly consistent, so this test can 
fail with a get that is too eager and
             // does not have retries.
@@ -291,6 +282,14 @@ public class PulsarFunctionTlsTest {
             pulsarAdmins[i].functions().deleteFunction(config.getTenant(), 
config.getNamespace(), config.getName());
         }
     }
+
+    private static boolean isLeaderNotReady(Throwable e) {
+        return e instanceof PulsarAdminException
+                && ((PulsarAdminException) e).getStatusCode() == 503
+                && String.valueOf(((PulsarAdminException) e).getHttpError())
+                        .contains("Leader not yet ready");
+    }
+
     @SuppressWarnings("deprecation")
 
     protected static FunctionConfig createFunctionConfig(

Reply via email to