This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2de90b24d52 [feat][sql] Support metadata store url (#18210)
2de90b24d52 is described below
commit 2de90b24d52c099d5608f9923e351980f19684cb
Author: tison <[email protected]>
AuthorDate: Wed Nov 2 10:23:36 2022 +0800
[feat][sql] Support metadata store url (#18210)
---
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 3 +--
.../bookkeeper/PulsarMetadataClientDriver.java | 4 +++
.../main/resources/conf/catalog/pulsar.properties | 4 +--
.../pulsar/sql/presto/PulsarConnectorCache.java | 9 ++++---
.../pulsar/sql/presto/PulsarConnectorConfig.java | 31 +++++++++++++++++++---
.../integration/topologies/PulsarCluster.java | 2 +-
6 files changed, 41 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index c619df2b3d4..20f109d0377 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -37,7 +37,6 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping;
@@ -68,7 +67,7 @@ public class BookKeeperClientFactoryImpl implements
BookKeeperClientFactory {
EventLoopGroup eventLoopGroup,
Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties, StatsLogger
statsLogger) throws IOException {
- MetadataDrivers.registerClientDriver("metadata-store",
PulsarMetadataClientDriver.class);
+ PulsarMetadataClientDriver.init();
ClientConfiguration bkConf = createBkClientConfiguration(store, conf);
if (properties != null) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java
index 6c1c53e344b..f2776062818 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java
@@ -37,6 +37,10 @@ public class PulsarMetadataClientDriver extends
AbstractMetadataDriver implement
MetadataDrivers.registerClientDriver(METADATA_STORE_SCHEME,
PulsarMetadataClientDriver.class);
}
+ public static void init() {
+ // cause <cinit> to be invoked
+ }
+
@Override
public MetadataClientDriver initialize(ClientConfiguration
clientConfiguration,
ScheduledExecutorService
scheduledExecutorService,
diff --git
a/pulsar-sql/presto-distribution/src/main/resources/conf/catalog/pulsar.properties
b/pulsar-sql/presto-distribution/src/main/resources/conf/catalog/pulsar.properties
index 8cbea0e1364..f15cd2657e0 100644
---
a/pulsar-sql/presto-distribution/src/main/resources/conf/catalog/pulsar.properties
+++
b/pulsar-sql/presto-distribution/src/main/resources/conf/catalog/pulsar.properties
@@ -26,8 +26,8 @@ pulsar.broker-service-url=http://localhost:8080
pulsar.web-service-url=http://localhost:8080
# the url of Pulsar broker binary service
pulsar.broker-binary-service-url=pulsar://localhost:6650
-# URI of Zookeeper cluster
-pulsar.zookeeper-uri=127.0.0.1:2181
+# the url of metadata store
+pulsar.metadata-url=zk:127.0.0.1:2181
# minimum number of entries to read at a single time
pulsar.max-entry-read-batch-size=100
# default number of splits to use per query
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 29331d88dd1..20b00b59e5a 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -46,6 +46,7 @@ import
org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
/**
* Implementation of a cache for the Pulsar connector.
@@ -73,7 +74,7 @@ public class PulsarConnectorCache {
private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig)
throws Exception {
- this.metadataStore =
MetadataStoreExtended.create(pulsarConnectorConfig.getZookeeperUri(),
+ this.metadataStore =
MetadataStoreExtended.create(pulsarConnectorConfig.getMetadataUrl(),
MetadataStoreConfig.builder().metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
this.managedLedgerFactory =
initManagedLedgerFactory(pulsarConnectorConfig);
this.statsProvider =
PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
@@ -109,10 +110,10 @@ public class PulsarConnectorCache {
private ManagedLedgerFactory
initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
throws Exception {
+ PulsarMetadataClientDriver.init();
+
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
- .setZkServers(pulsarConnectorConfig.getZookeeperUri())
- .setMetadataServiceUri("zk://" +
pulsarConnectorConfig.getZookeeperUri()
- .replace(",", ";") + "/ledgers")
+ .setMetadataServiceUri("metadata-store:" +
pulsarConnectorConfig.getMetadataUrl())
.setClientTcpNoDelay(false)
.setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol())
.setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval())
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 5660fcd35e4..f6907e3cba3 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -42,10 +42,12 @@ import org.apache.pulsar.common.protocol.Commands;
*/
public class PulsarConnectorConfig implements AutoCloseable {
+ private boolean hasMetadataUrl = false;
+
private String brokerServiceUrl = "http://localhost:8080";
private String brokerBinaryServiceUrl = "pulsar://localhost:6650/";
private String webServiceUrl = ""; //leave empty
- private String zookeeperUri = "localhost:2181";
+ private String metadataUrl = "zk:localhost:2181";
private int entryReadBatchSize = 100;
private int targetNumSplits = 2;
private int maxSplitMessageQueueSize = 10000;
@@ -134,14 +136,37 @@ public class PulsarConnectorConfig implements
AutoCloseable {
return this.maxMessageSize;
}
+ /**
+ * @deprecated use {@link #getMetadataUrl()}
+ */
+ @Deprecated
@NotNull
public String getZookeeperUri() {
- return this.zookeeperUri;
+ return getMetadataUrl();
}
+ /**
+ * @deprecated use {@link #setMetadataUrl(String)}
+ */
+ @Deprecated
@Config("pulsar.zookeeper-uri")
public PulsarConnectorConfig setZookeeperUri(String zookeeperUri) {
- this.zookeeperUri = zookeeperUri;
+ if (hasMetadataUrl) {
+ return this;
+ }
+ this.metadataUrl = zookeeperUri;
+ return this;
+ }
+
+ @NotNull
+ public String getMetadataUrl() {
+ return this.metadataUrl;
+ }
+
+ @Config("pulsar.metadata-url")
+ public PulsarConnectorConfig setMetadataUrl(String metadataUrl) {
+ this.hasMetadataUrl = true;
+ this.metadataUrl = metadataUrl;
return this;
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 6bb715056b0..a15a0e56f4a 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -467,7 +467,7 @@ public class PulsarCluster {
.withEnv("clusterName", clusterName)
.withEnv("zkServers", ZKContainer.NAME)
.withEnv("zookeeperServers", ZKContainer.NAME + ":" +
ZKContainer.ZK_PORT)
- .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" +
ZKContainer.ZK_PORT)
+ .withEnv("pulsar.metadata-url", "zk:" + ZKContainer.NAME + ":"
+ ZKContainer.ZK_PORT)
.withEnv("pulsar.web-service-url",
"http://pulsar-broker-0:8080")
.withEnv("SQL_PREFIX_pulsar.max-message-size", "" +
spec.maxMessageSize)
.withClasspathResourceMapping(