This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a8f3c8117bd [branch-2.10][broker] Support zookeeper read-only config.
(#19156) (#19637)
a8f3c8117bd is described below
commit a8f3c8117bdfabbb39d91e39dbe4c055505a2e45
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Mar 1 19:12:49 2023 +0800
[branch-2.10][broker] Support zookeeper read-only config. (#19156) (#19637)
Co-authored-by: Yan Zhao <[email protected]>
---
.../java/org/apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++
.../org/apache/pulsar/broker/resources/PulsarResources.java | 12 ++++++++++--
.../main/java/org/apache/pulsar/broker/PulsarService.java | 4 ++--
.../pulsar/websocket/proxy/ProxyAuthenticationTest.java | 4 +++-
.../pulsar/websocket/proxy/ProxyAuthorizationTest.java | 4 +++-
.../pulsar/websocket/proxy/ProxyConfigurationTest.java | 4 +++-
.../apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java | 4 +++-
.../org/apache/pulsar/websocket/proxy/ProxyPingTest.java | 4 +++-
.../pulsar/websocket/proxy/ProxyPublishConsumeTest.java | 4 +++-
.../pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java | 4 +++-
.../websocket/proxy/ProxyPublishConsumeWithoutZKTest.java | 4 +++-
.../websocket/proxy/v1/V1_ProxyAuthenticationTest.java | 4 +++-
.../org/apache/pulsar/functions/worker/WorkerConfig.java | 6 ++++++
.../main/java/org/apache/pulsar/functions/worker/Worker.java | 3 ++-
.../org/apache/pulsar/proxy/server/ProxyConfiguration.java | 6 ++++++
.../java/org/apache/pulsar/proxy/server/ProxyService.java | 8 +++++---
.../java/org/apache/pulsar/websocket/WebSocketService.java | 10 ++++++----
17 files changed, 70 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b0a8581aac9..6337d3cf7e4 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -417,6 +417,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int zooKeeperCacheExpirySeconds = -1;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Is zookeeper allow read-only operations."
+ )
+ private boolean zooKeeperAllowReadOnlyOperations;
+
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index a087d8090d3..3c1a8fb41df 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -88,9 +88,17 @@ public class PulsarResources {
this.configurationMetadataStore =
Optional.ofNullable(configurationMetadataStore);
}
- public static MetadataStoreExtended createMetadataStore(String serverUrls,
int sessionTimeoutMs)
+ public static MetadataStoreExtended createMetadataStore(String serverUrls,
int sessionTimeoutMs,
+ boolean
allowReadOnlyOperations)
throws MetadataStoreException {
return MetadataStoreExtended.create(serverUrls,
MetadataStoreConfig.builder()
-
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build());
+
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(allowReadOnlyOperations).build());
+ }
+
+ public static MetadataStoreExtended createConfigMetadataStore(String
serverUrls, int sessionTimeoutMs,
+ boolean
allowReadOnlyOperations)
+ throws MetadataStoreException {
+ return MetadataStoreExtended.create(serverUrls,
MetadataStoreConfig.builder()
+
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(allowReadOnlyOperations).build());
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2bda5c55398..e2034bdc60b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -347,7 +347,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return
MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int)
config.getMetadataStoreSessionTimeoutMillis())
- .allowReadOnlyOperations(false)
+
.allowReadOnlyOperations(config.isZooKeeperAllowReadOnlyOperations())
.configFilePath(config.getMetadataStoreConfigPath())
.batchingEnabled(config.isMetadataStoreBatchingEnabled())
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
@@ -980,7 +980,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return MetadataStoreExtended.create(config.getMetadataStoreUrl(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int)
config.getMetadataStoreSessionTimeoutMillis())
- .allowReadOnlyOperations(false)
+
.allowReadOnlyOperations(config.isZooKeeperAllowReadOnlyOperations())
.configFilePath(config.getMetadataStoreConfigPath())
.batchingEnabled(config.isMetadataStoreBatchingEnabled())
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index a34ec879ba6..49ba21ce388 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -83,7 +84,8 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
}
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index 7e0ee1bd466..a62f4751459 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.websocket.proxy;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -64,7 +65,8 @@ public class ProxyAuthorizationTest extends
MockedPulsarServiceBaseTest {
config.setWebServicePort(Optional.of(0));
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
service.start();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index 26cf8a0e154..8ff00cf0ce9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.websocket.proxy;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -67,7 +68,8 @@ public class ProxyConfigurationTest extends
ProducerConsumerBase {
config.setServiceUrl("http://localhost:8080");
config.getProperties().setProperty("brokerClient_lookupTimeoutMs",
"100");
WebSocketService service =
spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
service.start();
PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
index 71f8c0b6d86..18451d9e751 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -64,7 +65,8 @@ public class ProxyIdleTimeoutTest extends
ProducerConsumerBase {
config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal))
+ .when(service).createConfigMetadataStore(anyString(),
anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
index 6e0dfa46cfd..9ab2629fe01 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -66,7 +67,8 @@ public class ProxyPingTest extends ProducerConsumerBase {
config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
config.setWebSocketPingDurationSeconds(2);
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 918640642ec..02d9ceb1c1a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -101,7 +102,8 @@ public class ProxyPublishConsumeTest extends
ProducerConsumerBase {
config.setClusterName("test");
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index a8b67416107..b98414a9270 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -74,7 +75,8 @@ public class ProxyPublishConsumeTlsTest extends
TlsProducerConsumerBase {
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index 1fb12645e5e..dadc246c98e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -62,7 +63,8 @@ public class ProxyPublishConsumeWithoutZKTest extends
ProducerConsumerBase {
config.setServiceUrl(pulsar.getSafeWebServiceAddress());
config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeper)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeper)).when(service)
+ .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index b80c3fb07be..af9b00e5921 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy.v1;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -85,7 +86,8 @@ public class V1_ProxyAuthenticationTest extends
V1_ProducerConsumerBase {
}
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index a55f49bfc20..27a6e8eea51 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -205,6 +205,12 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
)
private int zooKeeperCacheExpirySeconds = -1;
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Is zooKeeper allow read-only operations."
+ )
+ private boolean zooKeeperAllowReadOnlyOperations;
+
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index f8871cb86e7..189daae348f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -75,7 +75,8 @@ public class Worker {
try {
configMetadataStore = PulsarResources.createMetadataStore(
workerConfig.getConfigurationMetadataStoreUrl(),
- (int)
workerConfig.getMetadataStoreSessionTimeoutMillis());
+ (int)
workerConfig.getMetadataStoreSessionTimeoutMillis(),
+ workerConfig.isZooKeeperAllowReadOnlyOperations());
} catch (IOException e) {
throw new PulsarServerException(e);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index d139bb83002..a3183eb749c 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -148,6 +148,12 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private int zooKeeperCacheExpirySeconds = -1;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Is zooKeeper allow read-only operations."
+ )
+ private boolean zooKeeperAllowReadOnlyOperations;
+
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The service url points to the broker cluster"
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 1960b5143a0..c29e2ba1696 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -427,12 +427,14 @@ public class ProxyService implements Closeable {
public MetadataStoreExtended createLocalMetadataStore() throws
MetadataStoreException {
return
PulsarResources.createMetadataStore(proxyConfig.getMetadataStoreUrl(),
- proxyConfig.getMetadataStoreSessionTimeoutMillis());
+ proxyConfig.getMetadataStoreSessionTimeoutMillis(),
+ proxyConfig.isZooKeeperAllowReadOnlyOperations());
}
public MetadataStoreExtended createConfigurationMetadataStore() throws
MetadataStoreException {
- return
PulsarResources.createMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(),
- proxyConfig.getMetadataStoreSessionTimeoutMillis());
+ return
PulsarResources.createConfigMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(),
+ proxyConfig.getMetadataStoreSessionTimeoutMillis(),
+ proxyConfig.isZooKeeperAllowReadOnlyOperations());
}
public Authentication getProxyClientAuthenticationPlugin() {
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index a57c6c491e7..09be1b70264 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -102,8 +102,9 @@ public class WebSocketService implements Closeable {
if (isNotBlank(config.getConfigurationMetadataStoreUrl())) {
try {
- configMetadataStore =
createMetadataStore(config.getConfigurationMetadataStoreUrl(),
- (int) config.getMetadataStoreSessionTimeoutMillis());
+ configMetadataStore =
createConfigMetadataStore(config.getConfigurationMetadataStoreUrl(),
+ (int) config.getMetadataStoreSessionTimeoutMillis(),
+ config.isZooKeeperAllowReadOnlyOperations());
} catch (MetadataStoreException e) {
throw new PulsarServerException(e);
}
@@ -123,9 +124,10 @@ public class WebSocketService implements Closeable {
log.info("Pulsar WebSocket Service started");
}
- public MetadataStoreExtended createMetadataStore(String serverUrls, int
sessionTimeoutMs)
+ public MetadataStoreExtended createConfigMetadataStore(String serverUrls,
int sessionTimeoutMs, boolean
+ isAllowReadOnlyOperations)
throws MetadataStoreException {
- return PulsarResources.createMetadataStore(serverUrls,
sessionTimeoutMs);
+ return PulsarResources.createConfigMetadataStore(serverUrls,
sessionTimeoutMs, isAllowReadOnlyOperations);
}
@Override