This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2de90b24d52 [feat][sql] Support metadata store url (#18210)
2de90b24d52 is described below

commit 2de90b24d52c099d5608f9923e351980f19684cb
Author: tison <[email protected]>
AuthorDate: Wed Nov 2 10:23:36 2022 +0800

    [feat][sql] Support metadata store url (#18210)
---
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |  3 +--
 .../bookkeeper/PulsarMetadataClientDriver.java     |  4 +++
 .../main/resources/conf/catalog/pulsar.properties  |  4 +--
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  9 ++++---
 .../pulsar/sql/presto/PulsarConnectorConfig.java   | 31 +++++++++++++++++++---
 .../integration/topologies/PulsarCluster.java      |  2 +-
 6 files changed, 41 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index c619df2b3d4..20f109d0377 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -37,7 +37,6 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping;
@@ -68,7 +67,7 @@ public class BookKeeperClientFactoryImpl implements 
BookKeeperClientFactory {
                              EventLoopGroup eventLoopGroup,
                              Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
                              Map<String, Object> properties, StatsLogger 
statsLogger) throws IOException {
-        MetadataDrivers.registerClientDriver("metadata-store", 
PulsarMetadataClientDriver.class);
+        PulsarMetadataClientDriver.init();
 
         ClientConfiguration bkConf = createBkClientConfiguration(store, conf);
         if (properties != null) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java
index 6c1c53e344b..f2776062818 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java
@@ -37,6 +37,10 @@ public class PulsarMetadataClientDriver extends 
AbstractMetadataDriver implement
         MetadataDrivers.registerClientDriver(METADATA_STORE_SCHEME, 
PulsarMetadataClientDriver.class);
     }
 
+    public static void init() {
+        // cause <cinit> to be invoked
+    }
+
     @Override
     public MetadataClientDriver initialize(ClientConfiguration 
clientConfiguration,
                                            ScheduledExecutorService 
scheduledExecutorService,
diff --git 
a/pulsar-sql/presto-distribution/src/main/resources/conf/catalog/pulsar.properties
 
b/pulsar-sql/presto-distribution/src/main/resources/conf/catalog/pulsar.properties
index 8cbea0e1364..f15cd2657e0 100644
--- 
a/pulsar-sql/presto-distribution/src/main/resources/conf/catalog/pulsar.properties
+++ 
b/pulsar-sql/presto-distribution/src/main/resources/conf/catalog/pulsar.properties
@@ -26,8 +26,8 @@ pulsar.broker-service-url=http://localhost:8080
 pulsar.web-service-url=http://localhost:8080
 # the url of Pulsar broker binary service
 pulsar.broker-binary-service-url=pulsar://localhost:6650
-# URI of Zookeeper cluster
-pulsar.zookeeper-uri=127.0.0.1:2181
+# the url of metadata store
+pulsar.metadata-url=zk:127.0.0.1:2181
 # minimum number of entries to read at a single time
 pulsar.max-entry-read-batch-size=100
 # default number of splits to use per query
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 29331d88dd1..20b00b59e5a 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -46,6 +46,7 @@ import 
org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
 
 /**
  * Implementation of a cache for the Pulsar connector.
@@ -73,7 +74,7 @@ public class PulsarConnectorCache {
 
 
     private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) 
throws Exception {
-        this.metadataStore = 
MetadataStoreExtended.create(pulsarConnectorConfig.getZookeeperUri(),
+        this.metadataStore = 
MetadataStoreExtended.create(pulsarConnectorConfig.getMetadataUrl(),
                 
MetadataStoreConfig.builder().metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
         this.managedLedgerFactory = 
initManagedLedgerFactory(pulsarConnectorConfig);
         this.statsProvider = 
PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
@@ -109,10 +110,10 @@ public class PulsarConnectorCache {
 
     private ManagedLedgerFactory 
initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
         throws Exception {
+        PulsarMetadataClientDriver.init();
+
         ClientConfiguration bkClientConfiguration = new ClientConfiguration()
-            .setZkServers(pulsarConnectorConfig.getZookeeperUri())
-            .setMetadataServiceUri("zk://" + 
pulsarConnectorConfig.getZookeeperUri()
-                .replace(",", ";") + "/ledgers")
+            .setMetadataServiceUri("metadata-store:" + 
pulsarConnectorConfig.getMetadataUrl())
             .setClientTcpNoDelay(false)
             
.setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol())
             
.setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval())
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 5660fcd35e4..f6907e3cba3 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -42,10 +42,12 @@ import org.apache.pulsar.common.protocol.Commands;
  */
 public class PulsarConnectorConfig implements AutoCloseable {
 
+    private boolean hasMetadataUrl = false;
+
     private String brokerServiceUrl = "http://localhost:8080";;
     private String brokerBinaryServiceUrl = "pulsar://localhost:6650/";
     private String webServiceUrl = ""; //leave empty
-    private String zookeeperUri = "localhost:2181";
+    private String metadataUrl = "zk:localhost:2181";
     private int entryReadBatchSize = 100;
     private int targetNumSplits = 2;
     private int maxSplitMessageQueueSize = 10000;
@@ -134,14 +136,37 @@ public class PulsarConnectorConfig implements 
AutoCloseable {
         return this.maxMessageSize;
     }
 
+    /**
+     * @deprecated use {@link #getMetadataUrl()}
+     */
+    @Deprecated
     @NotNull
     public String getZookeeperUri() {
-        return this.zookeeperUri;
+        return getMetadataUrl();
     }
 
+    /**
+     * @deprecated use {@link #setMetadataUrl(String)}
+     */
+    @Deprecated
     @Config("pulsar.zookeeper-uri")
     public PulsarConnectorConfig setZookeeperUri(String zookeeperUri) {
-        this.zookeeperUri = zookeeperUri;
+        if (hasMetadataUrl) {
+            return this;
+        }
+        this.metadataUrl = zookeeperUri;
+        return this;
+    }
+
+    @NotNull
+    public String getMetadataUrl() {
+        return this.metadataUrl;
+    }
+
+    @Config("pulsar.metadata-url")
+    public PulsarConnectorConfig setMetadataUrl(String metadataUrl) {
+        this.hasMetadataUrl = true;
+        this.metadataUrl = metadataUrl;
         return this;
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 6bb715056b0..a15a0e56f4a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -467,7 +467,7 @@ public class PulsarCluster {
                 .withEnv("clusterName", clusterName)
                 .withEnv("zkServers", ZKContainer.NAME)
                 .withEnv("zookeeperServers", ZKContainer.NAME + ":" + 
ZKContainer.ZK_PORT)
-                .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + 
ZKContainer.ZK_PORT)
+                .withEnv("pulsar.metadata-url", "zk:" + ZKContainer.NAME + ":" 
+ ZKContainer.ZK_PORT)
                 .withEnv("pulsar.web-service-url", 
"http://pulsar-broker-0:8080";)
                 .withEnv("SQL_PREFIX_pulsar.max-message-size", "" + 
spec.maxMessageSize)
                 .withClasspathResourceMapping(

Reply via email to