This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eccc6b647e9 [fix][test] Fix more resource leaks in tests (#24314)
eccc6b647e9 is described below
commit eccc6b647e9cb07a18901471de1b2f8fafa88417
Author: Lari Hotari <[email protected]>
AuthorDate: Sat May 17 09:02:05 2025 +0300
[fix][test] Fix more resource leaks in tests (#24314)
---
.../pulsar/tests/ThreadLeakDetectorListener.java | 8 +
.../mledger/impl/ManagedLedgerFactoryImpl.java | 16 +-
.../bookkeeper/test/BookKeeperClusterTestCase.java | 9 +-
.../ProxySaslAuthenticationTest.java | 501 +++++++++++----------
.../BookieRackAffinityMappingTest.java | 1 +
.../apache/pulsar/common/util/RateLimiterTest.java | 4 +-
.../impl/FileStoreBackedReadHandleImpl.java | 2 +-
.../offload/filesystem/FileStoreTestBase.java | 6 +-
.../impl/FileSystemManagedLedgerOffloaderTest.java | 11 +-
.../impl/FileSystemOffloaderLocalFileTest.java | 33 +-
10 files changed, 326 insertions(+), 265 deletions(-)
diff --git
a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
index 3708a867246..43f219ebc23 100644
---
a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
+++
b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
@@ -227,6 +227,10 @@ public class ThreadLeakDetectorListener extends
BetweenTestClassesListenerAdapte
if (threadName.equals("process reaper")) {
return true;
}
+ // skip thread created by sun.net.www.http.KeepAliveCache
+ if (threadName.equals("Keep-Alive-Timer")) {
+ return true;
+ }
// skip JVM internal thread related to agent attach
if (threadName.equals("Attach Listener")) {
return true;
@@ -255,6 +259,10 @@ public class ThreadLeakDetectorListener extends
BetweenTestClassesListenerAdapte
if (threadName.equals("Grizzly-HttpSession-Expirer")) {
return true;
}
+ // skip Hadoop LocalFileSystem stats thread
+ if
(threadName.equals("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner"))
{
+ return true;
+ }
// Testcontainers AbstractWaitStrategy.EXECUTOR
if (threadName.startsWith("testcontainers-wait-")) {
return true;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 225f4dba493..627e3225519 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -250,7 +250,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
openTelemetryManagedCursorStats = new
OpenTelemetryManagedCursorStats(openTelemetry, this);
}
- static class DefaultBkFactory implements
BookkeeperFactoryForCustomEnsemblePlacementPolicy {
+ static class DefaultBkFactory implements
BookkeeperFactoryForCustomEnsemblePlacementPolicy, AutoCloseable {
private final BookKeeper bkClient;
@@ -263,6 +263,11 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
public CompletableFuture<BookKeeper> get(EnsemblePlacementPolicyConfig
policy) {
return CompletableFuture.completedFuture(bkClient);
}
+
+ @Override
+ public void close() throws Exception {
+ bkClient.close();
+ }
}
private synchronized void handleMetadataStoreNotification(SessionEvent e) {
@@ -646,13 +651,20 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
}
}
}));
- }).thenAcceptAsync(__ -> {
+ }).whenCompleteAsync((__, ___) -> {
//wait for tasks in scheduledExecutor executed.
openTelemetryManagedCursorStats.close();
openTelemetryManagedLedgerStats.close();
openTelemetryCacheStats.close();
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
+ if (bookkeeperFactory instanceof DefaultBkFactory
defaultBkFactory) {
+ try {
+ defaultBkFactory.close();
+ } catch (Exception e) {
+ log.warn("Failed to close bookkeeper client", e);
+ }
+ }
});
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index a323ecfeb8e..d3a2e83ba89 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -26,7 +26,6 @@ package org.apache.bookkeeper.test;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertFalse;
-
import com.google.common.base.Stopwatch;
import java.io.File;
import java.io.IOException;
@@ -73,9 +72,9 @@ import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeTest;
/**
* A class runs several bookie servers for testing.
@@ -148,7 +147,7 @@ public abstract class BookKeeperClusterTestCase {
}
}
- @BeforeTest(alwaysRun = true)
+ @BeforeClass(alwaysRun = true)
public void setUp() throws Exception {
setUp(getLedgersRootPath());
}
@@ -187,7 +186,7 @@ public abstract class BookKeeperClusterTestCase {
return "";
}
- @AfterTest(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
boolean failed = false;
for (Throwable e : asyncExceptions) {
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index ca28befabc1..7a50fa42089 100644
---
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -62,255 +62,256 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
- private static final Logger log =
LoggerFactory.getLogger(ProxySaslAuthenticationTest.class);
-
- public static File kdcDir;
- public static File kerberosWorkDir;
- public static File brokerSecretKeyFile;
- public static File proxySecretKeyFile;
-
- private static MiniKdc kdc;
- private static Properties properties;
-
- private static String localHostname = "localhost";
-
- @BeforeClass
- public static void startMiniKdc() throws Exception {
- kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
- kerberosWorkDir =
Files.createTempDirectory("test-kerberos-work-dir").toFile();
-
- properties = MiniKdc.createConf();
- kdc = new MiniKdc(properties, kdcDir);
- kdc.start();
-
- String principalBrokerNoRealm = "broker/" + localHostname;
- String principalBroker = "broker/" + localHostname + "@" +
kdc.getRealm();
- log.info("principalBroker: " + principalBroker);
-
- String principalClientNoRealm = "client/" + localHostname;
- String principalClient = principalClientNoRealm + "@" +
kdc.getRealm();
- log.info("principalClient: " + principalClient);
-
- String principalProxyNoRealm = "proxy/" + localHostname;
- String principalProxy = principalProxyNoRealm + "@" +
kdc.getRealm();
- log.info("principalProxy: " + principalProxy);
-
- File keytabClient = new File(kerberosWorkDir,
"pulsarclient.keytab");
- kdc.createPrincipal(keytabClient, principalClientNoRealm);
-
- File keytabBroker = new File(kerberosWorkDir,
"pulsarbroker.keytab");
- kdc.createPrincipal(keytabBroker, principalBrokerNoRealm);
-
- File keytabProxy = new File(kerberosWorkDir,
"pulsarproxy.keytab");
- kdc.createPrincipal(keytabProxy, principalProxyNoRealm);
-
- File jaasFile = new File(kerberosWorkDir, "jaas.conf");
- try (FileWriter writer = new FileWriter(jaasFile)) {
- writer.write("\n"
- + "PulsarBroker {\n"
- + "
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
- + " useKeyTab=true\n"
- + " keyTab=\"" +
keytabBroker.getAbsolutePath() + "\n"
- + " storeKey=true\n"
- + " useTicketCache=false\n" // won't test
useTicketCache=true on JUnit tests
- + " principal=\"" + principalBroker + "\";\n"
- + "};\n"
- + "\n"
- + "\n"
- + "\n"
- + "PulsarProxy{\n"
- + "
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
- + " useKeyTab=true\n"
- + " keyTab=\"" + keytabProxy.getAbsolutePath()
+ "\n"
- + " storeKey=true\n"
- + " useTicketCache=false\n" // won't test
useTicketCache=true on JUnit tests
- + " principal=\"" + principalProxy + "\";\n"
- + "};\n"
- + "\n"
- + "\n"
- + "\n"
- + "PulsarClient {\n"
- + "
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
- + " useKeyTab=true\n"
- + " keyTab=\"" +
keytabClient.getAbsolutePath() + "\n"
- + " storeKey=true\n"
- + " useTicketCache=false\n"
- + " principal=\"" + principalClient + "\";\n"
- + "};\n"
- );
- }
-
- File krb5file = new File(kerberosWorkDir, "krb5.conf");
- try (FileWriter writer = new FileWriter(krb5file)) {
- String conf = "[libdefaults]\n"
- + " default_realm = " + kdc.getRealm() + "\n"
- + " udp_preference_limit = 1\n" // force use TCP
- + "\n"
- + "\n"
- + "[realms]\n"
- + " " + kdc.getRealm() + " = {\n"
- + " kdc = " + kdc.getHost() + ":" +
kdc.getPort() + "\n"
- + " }";
- writer.write(conf);
- log.info("krb5.conf:\n" + conf);
- }
-
- System.setProperty("java.security.auth.login.config",
jaasFile.getAbsolutePath());
- System.setProperty("java.security.krb5.conf",
krb5file.getAbsolutePath());
- Configuration.getConfiguration().refresh();
-
- // Client config
-
- log.info("created AuthenticationSasl");
- }
-
- @AfterClass(alwaysRun = true)
- public static void stopMiniKdc() {
- System.clearProperty("java.security.auth.login.config");
- System.clearProperty("java.security.krb5.conf");
- if (kdc != null) {
- kdc.stop();
- }
- FileUtils.deleteQuietly(kdcDir);
- FileUtils.deleteQuietly(kerberosWorkDir);
- Assert.assertFalse(kdcDir.exists());
- Assert.assertFalse(kerberosWorkDir.exists());
- }
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- log.info("-- {} --, start at host: {}", methodName,
localHostname);
- isTcpLookup = true;
- conf.setAdvertisedAddress(localHostname);
- conf.setAuthenticationEnabled(true);
- conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
- conf.setSaslJaasServerSectionName("PulsarBroker");
- brokerSecretKeyFile =
File.createTempFile("saslRoleTokenSignerSecret", ".key");
- Files.write(Paths.get(brokerSecretKeyFile.toString()),
"PulsarSecret".getBytes());
-
conf.setSaslJaasServerRoleTokenSignerSecretPath(brokerSecretKeyFile.toString());
- Set<String> providers = new HashSet<>();
- providers.add(AuthenticationProviderSasl.class.getName());
- conf.setAuthenticationProviders(providers);
- conf.setClusterName("test");
- conf.setSuperUserRoles(ImmutableSet.of("client/" +
localHostname + "@" + kdc.getRealm()));
- // set admin auth, to verify admin web resources
- Map<String, String> clientSaslConfig = new HashMap<>();
- clientSaslConfig.put("saslJaasClientSectionName",
"PulsarClient");
- clientSaslConfig.put("serverType", "broker");
-
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
- conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
-
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
-
- super.init();
-
- lookupUrl = new URI(pulsar.getBrokerServiceUrl());
- log.info("set client jaas section name: PulsarClient");
- closeAdmin();
- admin = PulsarAdmin.builder()
- .serviceHttpUrl(brokerUrl.toString())
-
.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(),
clientSaslConfig))
- .build();
- super.producerBaseSetup();
- log.info("-- {} --, end.", methodName);
- }
-
- @Override
- @AfterMethod(alwaysRun = true)
- protected void cleanup() throws Exception {
- FileUtils.deleteQuietly(brokerSecretKeyFile);
- Assert.assertFalse(brokerSecretKeyFile.exists());
- FileUtils.deleteQuietly(proxySecretKeyFile);
- Assert.assertFalse(proxySecretKeyFile.exists());
- super.internalCleanup();
- }
-
- @Test
- void testAuthentication() throws Exception {
- log.info("-- Starting {} test --", methodName);
-
- // Step 1: Create Admin Client
-
- // create a client which connects to proxy and pass authData
- String topicName = "persistent://my-property/my-ns/my-topic1";
-
- ProxyConfiguration proxyConfig = new ProxyConfiguration();
- proxyConfig.setAuthenticationEnabled(true);
- proxyConfig.setServicePort(Optional.of(0));
- proxyConfig.setBrokerProxyAllowedTargetPorts("*");
- proxyConfig.setWebServicePort(Optional.of(0));
- proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
- proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname +
".*");
- proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
- proxyConfig.setClusterName(configClusterName);
-
- // proxy connect to broker
-
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
- proxyConfig.setBrokerClientAuthenticationParameters(
- "{\"saslJaasClientSectionName\": " + "\"PulsarProxy\","
+
- "\"serverType\": " + "\"broker\"}");
- proxySecretKeyFile =
File.createTempFile("saslRoleTokenSignerSecret", ".key");
- Files.write(Paths.get(proxySecretKeyFile.toString()),
"PulsarSecret".getBytes());
-
proxyConfig.setSaslJaasServerRoleTokenSignerSecretPath(proxySecretKeyFile.toString());
- // proxy as a server, it will use sasl to authn
- Set<String> providers = new HashSet<>();
- providers.add(AuthenticationProviderSasl.class.getName());
- proxyConfig.setAuthenticationProviders(providers);
-
- proxyConfig.setForwardAuthorizationCredentials(true);
- AuthenticationService authenticationService = new
AuthenticationService(
+ private static final Logger log =
LoggerFactory.getLogger(ProxySaslAuthenticationTest.class);
+
+ public static File kdcDir;
+ public static File kerberosWorkDir;
+ public static File brokerSecretKeyFile;
+ public static File proxySecretKeyFile;
+
+ private static MiniKdc kdc;
+ private static Properties properties;
+
+ private static String localHostname = "localhost";
+
+ @BeforeClass
+ public static void startMiniKdc() throws Exception {
+ kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
+ kerberosWorkDir =
Files.createTempDirectory("test-kerberos-work-dir").toFile();
+
+ properties = MiniKdc.createConf();
+ kdc = new MiniKdc(properties, kdcDir);
+ kdc.start();
+
+ String principalBrokerNoRealm = "broker/" + localHostname;
+ String principalBroker = "broker/" + localHostname + "@" +
kdc.getRealm();
+ log.info("principalBroker: " + principalBroker);
+
+ String principalClientNoRealm = "client/" + localHostname;
+ String principalClient = principalClientNoRealm + "@" + kdc.getRealm();
+ log.info("principalClient: " + principalClient);
+
+ String principalProxyNoRealm = "proxy/" + localHostname;
+ String principalProxy = principalProxyNoRealm + "@" + kdc.getRealm();
+ log.info("principalProxy: " + principalProxy);
+
+ File keytabClient = new File(kerberosWorkDir, "pulsarclient.keytab");
+ kdc.createPrincipal(keytabClient, principalClientNoRealm);
+
+ File keytabBroker = new File(kerberosWorkDir, "pulsarbroker.keytab");
+ kdc.createPrincipal(keytabBroker, principalBrokerNoRealm);
+
+ File keytabProxy = new File(kerberosWorkDir, "pulsarproxy.keytab");
+ kdc.createPrincipal(keytabProxy, principalProxyNoRealm);
+
+ File jaasFile = new File(kerberosWorkDir, "jaas.conf");
+ try (FileWriter writer = new FileWriter(jaasFile)) {
+ writer.write("\n"
+ + "PulsarBroker {\n"
+ + " com.sun.security.auth.module.Krb5LoginModule required
debug=true\n"
+ + " useKeyTab=true\n"
+ + " keyTab=\"" + keytabBroker.getAbsolutePath() + "\n"
+ + " storeKey=true\n"
+ + " useTicketCache=false\n" // won't test useTicketCache=true
on JUnit tests
+ + " principal=\"" + principalBroker + "\";\n"
+ + "};\n"
+ + "\n"
+ + "\n"
+ + "\n"
+ + "PulsarProxy{\n"
+ + " com.sun.security.auth.module.Krb5LoginModule required
debug=true\n"
+ + " useKeyTab=true\n"
+ + " keyTab=\"" + keytabProxy.getAbsolutePath() + "\n"
+ + " storeKey=true\n"
+ + " useTicketCache=false\n" // won't test useTicketCache=true
on JUnit tests
+ + " principal=\"" + principalProxy + "\";\n"
+ + "};\n"
+ + "\n"
+ + "\n"
+ + "\n"
+ + "PulsarClient {\n"
+ + " com.sun.security.auth.module.Krb5LoginModule required
debug=true\n"
+ + " useKeyTab=true\n"
+ + " keyTab=\"" + keytabClient.getAbsolutePath() + "\n"
+ + " storeKey=true\n"
+ + " useTicketCache=false\n"
+ + " principal=\"" + principalClient + "\";\n"
+ + "};\n"
+ );
+ }
+
+ File krb5file = new File(kerberosWorkDir, "krb5.conf");
+ try (FileWriter writer = new FileWriter(krb5file)) {
+ String conf = "[libdefaults]\n"
+ + " default_realm = " + kdc.getRealm() + "\n"
+ + " udp_preference_limit = 1\n" // force use TCP
+ + "\n"
+ + "\n"
+ + "[realms]\n"
+ + " " + kdc.getRealm() + " = {\n"
+ + " kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
+ + " }";
+ writer.write(conf);
+ log.info("krb5.conf:\n" + conf);
+ }
+
+ System.setProperty("java.security.auth.login.config",
jaasFile.getAbsolutePath());
+ System.setProperty("java.security.krb5.conf",
krb5file.getAbsolutePath());
+ Configuration.getConfiguration().refresh();
+
+ // Client config
+
+ log.info("created AuthenticationSasl");
+ }
+
+ @AfterClass(alwaysRun = true)
+ public static void stopMiniKdc() {
+ System.clearProperty("java.security.auth.login.config");
+ System.clearProperty("java.security.krb5.conf");
+ if (kdc != null) {
+ kdc.stop();
+ }
+ FileUtils.deleteQuietly(kdcDir);
+ FileUtils.deleteQuietly(kerberosWorkDir);
+ Assert.assertFalse(kdcDir.exists());
+ Assert.assertFalse(kerberosWorkDir.exists());
+ }
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ log.info("-- {} --, start at host: {}", methodName, localHostname);
+ isTcpLookup = true;
+ conf.setAdvertisedAddress(localHostname);
+ conf.setAuthenticationEnabled(true);
+ conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
+ conf.setSaslJaasServerSectionName("PulsarBroker");
+ brokerSecretKeyFile = File.createTempFile("saslRoleTokenSignerSecret",
".key");
+ Files.write(Paths.get(brokerSecretKeyFile.toString()),
"PulsarSecret".getBytes());
+
conf.setSaslJaasServerRoleTokenSignerSecretPath(brokerSecretKeyFile.toString());
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderSasl.class.getName());
+ conf.setAuthenticationProviders(providers);
+ conf.setClusterName("test");
+ conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@"
+ kdc.getRealm()));
+ // set admin auth, to verify admin web resources
+ Map<String, String> clientSaslConfig = new HashMap<>();
+ clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+ clientSaslConfig.put("serverType", "broker");
+
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+ conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
+
+ super.init();
+
+ lookupUrl = new URI(pulsar.getBrokerServiceUrl());
+ log.info("set client jaas section name: PulsarClient");
+ closeAdmin();
+ admin = PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl.toString())
+
.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(),
clientSaslConfig))
+ .build();
+ super.producerBaseSetup();
+ log.info("-- {} --, end.", methodName);
+ }
+
+ @Override
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ FileUtils.deleteQuietly(brokerSecretKeyFile);
+ Assert.assertFalse(brokerSecretKeyFile.exists());
+ FileUtils.deleteQuietly(proxySecretKeyFile);
+ Assert.assertFalse(proxySecretKeyFile.exists());
+ super.internalCleanup();
+ }
+
+ @Test
+ void testAuthentication() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ // Step 1: Create Admin Client
+
+ // create a client which connects to proxy and pass authData
+ String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
+ proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
+ proxyConfig.setClusterName(configClusterName);
+
+ // proxy connect to broker
+
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(
+ "{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," +
+ "\"serverType\": " + "\"broker\"}");
+ proxySecretKeyFile = File.createTempFile("saslRoleTokenSignerSecret",
".key");
+ Files.write(Paths.get(proxySecretKeyFile.toString()),
"PulsarSecret".getBytes());
+
proxyConfig.setSaslJaasServerRoleTokenSignerSecretPath(proxySecretKeyFile.toString());
+ // proxy as a server, it will use sasl to authn
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderSasl.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+
+ proxyConfig.setForwardAuthorizationCredentials(true);
+ @Cleanup
+ AuthenticationService authenticationService = new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
- @Cleanup
- final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
-
proxyConfig.getBrokerClientAuthenticationParameters());
- proxyClientAuthentication.start();
- ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication);
-
- proxyService.start();
- final String proxyServiceUrl = "pulsar://localhost:" +
proxyService.getListenPort().get();
- log.info("1 proxy service started {}", proxyService);
-
- // Step 3: Pass correct client params
- @Cleanup
- PulsarClient proxyClient = createProxyClient(proxyServiceUrl,
1);
- log.info("2 create proxy client {}, {}", proxyServiceUrl,
proxyClient);
-
- Producer<byte[]> producer =
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
- log.info("3 created producer.");
-
- Consumer<byte[]> consumer =
proxyClient.newConsumer(Schema.BYTES).topic(topicName).subscriptionName("test-sub").subscribe();
- log.info("4 created consumer.");
-
- for (int i = 0; i < 10; i++) {
- String message = "my-message-" + i;
- producer.send(message.getBytes());
- log.info("Produced message: [{}]", message);
- }
-
- Message<byte[]> msg = null;
- Set<String> messageSet = new HashSet<>();
- for (int i = 0; i < 10; i++) {
- msg = consumer.receive(5, TimeUnit.SECONDS);
- String receivedMessage = new String(msg.getData());
- log.info("Received message: [{}]", receivedMessage);
- String expectedMessage = "my-message-" + i;
- testMessageOrderAndDuplicates(messageSet,
receivedMessage, expectedMessage);
- }
- // Acknowledge the consumption of all messages at once
- consumer.acknowledgeCumulative(msg);
- consumer.close();
-
- proxyService.close();
- }
-
- private PulsarClient createProxyClient(String proxyServiceUrl, int
numberOfConnections) throws PulsarClientException {
- Map<String, String> clientSaslConfig = new HashMap<>();
- clientSaslConfig.put("saslJaasClientSectionName",
"PulsarClient");
- clientSaslConfig.put("serverType", "proxy");
- log.info("set client jaas section name: PulsarClient,
serverType: proxy");
- Authentication authSasl =
AuthenticationFactory.create(AuthenticationSasl.class.getName(),
clientSaslConfig);
-
- return PulsarClient.builder().serviceUrl(proxyServiceUrl)
-
.authentication(authSasl).connectionsPerBroker(numberOfConnections).build();
- }
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication);
+
+ proxyService.start();
+ final String proxyServiceUrl = "pulsar://localhost:" +
proxyService.getListenPort().get();
+ log.info("1 proxy service started {}", proxyService);
+
+ // Step 3: Pass correct client params
+ @Cleanup
+ PulsarClient proxyClient = createProxyClient(proxyServiceUrl, 1);
+ log.info("2 create proxy client {}, {}", proxyServiceUrl, proxyClient);
+
+ Producer<byte[]> producer =
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+ log.info("3 created producer.");
+
+ Consumer<byte[]> consumer =
proxyClient.newConsumer(Schema.BYTES).topic(topicName).subscriptionName("test-sub").subscribe();
+ log.info("4 created consumer.");
+
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ log.info("Produced message: [{}]", message);
+ }
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.info("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage,
expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+
+ proxyService.close();
+ }
+
+ private PulsarClient createProxyClient(String proxyServiceUrl, int
numberOfConnections) throws PulsarClientException {
+ Map<String, String> clientSaslConfig = new HashMap<>();
+ clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+ clientSaslConfig.put("serverType", "proxy");
+ log.info("set client jaas section name: PulsarClient, serverType:
proxy");
+ Authentication authSasl =
AuthenticationFactory.create(AuthenticationSasl.class.getName(),
clientSaslConfig);
+
+ return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+
.authentication(authSasl).connectionsPerBroker(numberOfConnections).build();
+ }
}
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index 96e4d0ed264..3ca24a680bb 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -247,6 +247,7 @@ public class BookieRackAffinityMappingTest {
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+ @Cleanup
PulsarRegistrationClient pulsarRegistrationClient = new
PulsarRegistrationClient(store, "/ledgers");
DefaultBookieAddressResolver defaultBookieAddressResolver = new
DefaultBookieAddressResolver(pulsarRegistrationClient);
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
index 5cbd0245565..fc0c900562f 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
@@ -22,10 +22,10 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import lombok.Cleanup;
import org.testng.annotations.Test;
public class RateLimiterTest {
@@ -218,6 +218,7 @@ public class RateLimiterTest {
long rateTime = 1;
long newUpdatedRateLimit = 100L;
Supplier<Long> permitUpdater = () -> newUpdatedRateLimit;
+ @Cleanup
RateLimiter limiter =
RateLimiter.builder().permits(permits).rateTime(1).timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdater)
.build();
@@ -233,6 +234,7 @@ public class RateLimiterTest {
long rateTime = 1;
int reNewTime = 3;
RateLimitFunction rateLimitFunction = atomicInteger::incrementAndGet;
+ @Cleanup
RateLimiter rateLimiter =
RateLimiter.builder().permits(permits).rateTime(rateTime).timeUnit(TimeUnit.SECONDS)
.rateLimitFunction(rateLimitFunction)
.build();
diff --git
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 91e7e902eab..abdd9d4b111 100644
---
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -164,7 +164,7 @@ public class FileStoreBackedReadHandleImpl implements
ReadHandle {
nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
}
- }
+ }
promise.complete(LedgerEntriesImpl.create(entries));
} catch (Throwable t) {
this.offloaderStats.recordReadOffloadError(topicName);
diff --git
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
index 9609362e9b7..b70a4623d5d 100644
---
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
+++
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -84,11 +84,15 @@ public abstract class FileStoreTestBase {
}
@AfterMethod(alwaysRun = true)
- public void tearDown() {
+ public void tearDown() throws Exception {
if (fileSystemManagedLedgerOffloader != null) {
fileSystemManagedLedgerOffloader.close();
fileSystemManagedLedgerOffloader = null;
}
+ if (offloaderStats != null) {
+ offloaderStats.close();
+ offloaderStats = null;
+ }
if (hdfsCluster != null) {
hdfsCluster.shutdown(true, true);
hdfsCluster = null;
diff --git
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
index 71fe5ec7219..0a7d6ab1c65 100644
---
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
+++
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
+import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
@@ -98,7 +99,7 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
@AfterMethod(alwaysRun = true)
@Override
- public void tearDown() {
+ public void tearDown() throws Exception {
super.tearDown();
}
@@ -122,6 +123,8 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
assertEquals(toWriteEntry.getEntryBuffer(),
toTestEntry.getEntryBuffer());
}
+ toTestEntries.close();
+ toWriteEntries.close();
toTestEntries = toTest.read(1, numberOfEntries - 1);
toWriteEntries = toWrite.read(1,numberOfEntries - 1);
toTestIter = toTestEntries.iterator();
@@ -135,6 +138,8 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
assertEquals(toWriteEntry.getEntryBuffer(),
toTestEntry.getEntryBuffer());
}
+ toTestEntries.close();
+ toWriteEntries.close();
}
@Test
@@ -155,6 +160,7 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
while (toTestIter.hasNext()) {
LedgerEntry toTestEntry = toTestIter.next();
}
+ toTestEntries.close();
assertTrue(offloaderStats.getReadOffloadError(topicName) == 0);
assertTrue(offloaderStats.getReadOffloadBytes(topicName) > 0);
@@ -168,10 +174,11 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, map).get();
Configuration configuration = new Configuration();
+ @Cleanup
FileSystem fileSystem = FileSystem.get(new URI(getURI()),
configuration);
assertTrue(fileSystem.exists(new Path(createDataFilePath(storagePath,
lh.getId(), uuid))));
assertTrue(fileSystem.exists(new Path(createIndexFilePath(storagePath,
lh.getId(), uuid))));
- offloader.deleteOffloaded(lh.getId(), uuid, map);
+ offloader.deleteOffloaded(lh.getId(), uuid, map).get();
assertFalse(fileSystem.exists(new Path(createDataFilePath(storagePath,
lh.getId(), uuid))));
assertFalse(fileSystem.exists(new
Path(createIndexFilePath(storagePath, lh.getId(), uuid))));
}
diff --git
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
index 14734b3faca..7653a3cf152 100644
---
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
+++
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.offload.filesystem.impl;
import static org.testng.Assert.assertEquals;
-
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -27,6 +26,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
+import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
@@ -38,12 +38,29 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class FileSystemOffloaderLocalFileTest {
- private OrderedScheduler scheduler =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
- private LedgerOffloaderStats offloaderStats =
LedgerOffloaderStats.create(true, true, scheduler, 60);
+ private OrderedScheduler scheduler;
+ private LedgerOffloaderStats offloaderStats;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ scheduler =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
+ offloaderStats = LedgerOffloaderStats.create(true, true, scheduler,
60);
+ }
+ @AfterClass(alwaysRun = true)
+ public void cleanup() throws Exception {
+ if (scheduler != null) {
+ scheduler.shutdown();
+ }
+ if (offloaderStats != null) {
+ offloaderStats.close();
+ }
+ }
private String getResourceFilePath(String name) {
return getClass().getClassLoader().getResource(name).getPath();
@@ -59,11 +76,13 @@ public class FileSystemOffloaderLocalFileTest {
offloadPolicies.setFileSystemProfilePath(getResourceFilePath("filesystem_offload_core_site.xml"));
// initialize the offloader with the offload policies
+ @Cleanup
var offloader =
FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler,
offloaderStats);
int numberOfEntries = 100;
// prepare the data in bookkeeper
+ @Cleanup
BookKeeper bk = new PulsarMockBookKeeper(scheduler);
LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32,
"".getBytes());
for (int i = 0; i < numberOfEntries; i++) {
@@ -72,6 +91,7 @@ public class FileSystemOffloaderLocalFileTest {
}
lh.close();
+ @Cleanup
ReadHandle read = bk.newOpenLedgerOp()
.withLedgerId(lh.getId())
.withDigestType(DigestType.CRC32)
@@ -83,6 +103,7 @@ public class FileSystemOffloaderLocalFileTest {
UUID uuid = UUID.randomUUID();
offloader.offload(read, uuid, offloadDriverMetadata).get();
+ @Cleanup
ReadHandle toTest = offloader.readOffloaded(read.getId(), uuid,
offloadDriverMetadata).get();
assertEquals(toTest.getLastAddConfirmed(), read.getLastAddConfirmed());
LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
@@ -98,6 +119,9 @@ public class FileSystemOffloaderLocalFileTest {
assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
assertEquals(toWriteEntry.getEntryBuffer(),
toTestEntry.getEntryBuffer());
}
+ toTestEntries.close();
+ toWriteEntries.close();
+
toTestEntries = toTest.read(1, numberOfEntries - 1);
toWriteEntries = read.read(1,numberOfEntries - 1);
toTestIter = toTestEntries.iterator();
@@ -112,6 +136,9 @@ public class FileSystemOffloaderLocalFileTest {
assertEquals(toWriteEntry.getEntryBuffer(),
toTestEntry.getEntryBuffer());
}
+ toTestEntries.close();
+ toWriteEntries.close();
+
// check the file located in the local file system
Path offloadedFilePath = Paths.get(basePath, mlName);
assertEquals(Files.exists(offloadedFilePath), true);