This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a6af80198f2 [fix][test] Fix flaky
PulsarFunctionTlsTest.testFunctionsCreation() test (#25889)
a6af80198f2 is described below
commit a6af80198f26d31e991e880db3f9b7d601a579c2
Author: Oneby Wang <[email protected]>
AuthorDate: Sat May 30 16:16:16 2026 +0800
[fix][test] Fix flaky PulsarFunctionTlsTest.testFunctionsCreation() test
(#25889)
---
.../functions/worker/PulsarFunctionTlsTest.java | 37 +++++++++++-----------
1 file changed, 18 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 9727dfb8f41..3d2e5f29886 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -19,8 +19,6 @@
package org.apache.pulsar.functions.worker;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
@@ -41,10 +39,10 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -259,23 +257,16 @@ public class PulsarFunctionTlsTest {
log.info().attr("function", functionName).log("Start test
function");
- int finalI = i;
- // Wait for a leader to be ready and create the function.
- // The createFunctionWithUrl call is included in the retry loop
because a leadership
- // transition can happen between the leader check and the actual
API call, causing
- // a 503 "Leader not yet ready" error.
final PulsarAdmin createAdmin = pulsarAdmins[i];
- Awaitility.await().atMost(1, TimeUnit.MINUTES).pollInterval(1,
TimeUnit.SECONDS).untilAsserted(() -> {
- final PulsarWorkerService workerService =
((PulsarWorkerService) fnWorkerServices[finalI]);
- final LeaderService leaderService =
workerService.getLeaderService();
- assertNotNull(leaderService);
- if (!leaderService.isLeader()) {
- final WorkerInfo workerInfo =
workerService.getMembershipManager().getLeader();
- assertTrue(workerInfo != null
- &&
!workerInfo.getWorkerId().equals(workerService.getWorkerConfig().getWorkerId()));
- }
- createAdmin.functions().createFunctionWithUrl(functionConfig,
jarFilePathUrl);
- });
+ // During function-worker leadership election/switchover, the
coordination topic can already point to
+ // the new leader while that worker is still finishing its leader
initialization. In that short window
+ // the internal /functions/leader request returns a transient 503
"Leader not yet ready", so retry only
+ // that condition and let all other failures surface immediately.
+ Awaitility.await().atMost(1, TimeUnit.MINUTES)
+ .pollInterval(1, TimeUnit.SECONDS)
+
.ignoreExceptionsMatching(PulsarFunctionTlsTest::isLeaderNotReady)
+ .untilAsserted(() -> createAdmin.functions()
+ .createFunctionWithUrl(functionConfig,
jarFilePathUrl));
// Function creation is not strongly consistent, so this test can
fail with a get that is too eager and
// does not have retries.
@@ -291,6 +282,14 @@ public class PulsarFunctionTlsTest {
pulsarAdmins[i].functions().deleteFunction(config.getTenant(),
config.getNamespace(), config.getName());
}
}
+
+ private static boolean isLeaderNotReady(Throwable e) {
+ return e instanceof PulsarAdminException
+ && ((PulsarAdminException) e).getStatusCode() == 503
+ && String.valueOf(((PulsarAdminException) e).getHttpError())
+ .contains("Leader not yet ready");
+ }
+
@SuppressWarnings("deprecation")
protected static FunctionConfig createFunctionConfig(