This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e6781d8 PIP-45: Only run ZKSessionWatcher when MetadataStoreExtended is required (#12150) e6781d8 is described below commit e6781d877e19d4be4fe836410903722b5cfe161d Author: Matteo Merli <mme...@apache.org> AuthorDate: Fri Sep 24 20:37:11 2021 -0700 PIP-45: Only run ZKSessionWatcher when MetadataStoreExtended is required (#12150) * PIP-45: Only run ZKSessionWatcher when MetadataStoreExtended is required * Remove inadvertently added spaces * Check for sessionWatcher null at the beginning * Fixed typo --- .../org/apache/pulsar/broker/PulsarService.java | 8 ++-- .../pulsar/metadata/api/MetadataStoreFactory.java | 10 ++-- .../api/extended/MetadataStoreExtended.java | 11 ++--- .../metadata/impl/MetadataStoreFactoryImpl.java | 55 ++++++++++++++++++++++ .../pulsar/metadata/impl/ZKMetadataStore.java | 21 ++++++--- 5 files changed, 81 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b6c85cb..32c0e0c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -144,8 +144,10 @@ import org.apache.pulsar.compaction.TwoPhaseCompactor; import org.apache.pulsar.functions.worker.ErrorNotifier; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.coordination.CoordinationService; @@ -248,7 +250,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private CoordinationService coordinationService; private TransactionBufferSnapshotService transactionBufferSnapshotService; - private MetadataStoreExtended configurationMetadataStore; + private MetadataStore configurationMetadataStore; private PulsarResources pulsarResources; private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider; @@ -314,8 +316,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { new DefaultThreadFactory("pulsar-io")); } - public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException { - return MetadataStoreExtended.create(config.getConfigurationStoreServers(), + public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException { + return MetadataStoreFactory.create(config.getConfigurationStoreServers(), MetadataStoreConfig.builder() .sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis()) .allowReadOnlyOperations(false) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java index 781a63b..828ea8a 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java @@ -23,6 +23,7 @@ import java.io.IOException; import lombok.experimental.UtilityClass; import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore; +import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; /** @@ -41,11 +42,8 @@ public class MetadataStoreFactory { * @throws IOException * if the metadata store initialization fails */ - public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { - if (metadataURL.startsWith("memory://")) { - return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig); - } else { - return new ZKMetadataStore(metadataURL, metadataStoreConfig); - } + public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig) + throws MetadataStoreException { + return MetadataStoreFactoryImpl.create(metadataURL, metadataStoreConfig); } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java index f4c82a6..fb0078d 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java @@ -30,6 +30,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException import org.apache.pulsar.metadata.api.MetadataStoreException.InvalidImplementationException; import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.Stat; +import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl; /** * Extension of the {@link MetadataStore} interface that includes more methods which might not be supported by all @@ -37,15 +38,9 @@ import org.apache.pulsar.metadata.api.Stat; */ public interface MetadataStoreExtended extends MetadataStore { - public static MetadataStoreExtended create(String metadataURL, MetadataStoreConfig metadataStoreConfig) + static MetadataStoreExtended create(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { - MetadataStore store = MetadataStoreFactory.create(metadataURL, metadataStoreConfig); - if (!(store instanceof MetadataStoreExtended)) { - throw new InvalidImplementationException( - "Implemetation does not comply with " + MetadataStoreExtended.class.getName()); - } - - return (MetadataStoreExtended) store; + return MetadataStoreFactoryImpl.createExtended(metadataURL, metadataStoreConfig); } /** diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java new file mode 100644 index 0000000..d7aaf02 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl; + +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; + +public class MetadataStoreFactoryImpl { + + public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws + MetadataStoreException { + return newInstance(metadataURL, metadataStoreConfig, false); + } + + public static MetadataStoreExtended createExtended(String metadataURL, MetadataStoreConfig metadataStoreConfig) + throws + MetadataStoreException { + MetadataStore store = MetadataStoreFactoryImpl.newInstance(metadataURL, metadataStoreConfig, true); + if (!(store instanceof MetadataStoreExtended)) { + throw new MetadataStoreException.InvalidImplementationException( + "Implementation does not comply with " + MetadataStoreExtended.class.getName()); + } + + return (MetadataStoreExtended) store; + } + + private static MetadataStore newInstance(String metadataURL, MetadataStoreConfig metadataStoreConfig, + boolean enableSessionWatcher) + throws MetadataStoreException { + + if (metadataURL.startsWith("memory://")) { + return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig); + } else { + return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher); + } + } +} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 4ac9bb1..2cee1c7 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -61,9 +61,10 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt private final MetadataStoreConfig metadataStoreConfig; private final boolean isZkManaged; private final ZooKeeper zkc; - private ZKSessionWatcher sessionWatcher; + private Optional<ZKSessionWatcher> sessionWatcher; - public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { + public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher) + throws MetadataStoreException { try { this.metadataURL = metadataURL; this.metadataStoreConfig = metadataStoreConfig; @@ -74,12 +75,16 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt .sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()) .watchers(Collections.singleton(event -> { if (sessionWatcher != null) { - sessionWatcher.process(event); + sessionWatcher.ifPresent(sw -> sw.process(event)); } })) .build(); zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE); - sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); + if (enableSessionWatcher) { + sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent)); + } else { + sessionWatcher = Optional.empty(); + } } catch (Throwable t) { throw new MetadataStoreException(t); } @@ -92,7 +97,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt this.metadataStoreConfig = null; this.isZkManaged = false; this.zkc = zkc; - this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); + this.sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent)); zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE); } @@ -106,7 +111,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt super.receivedSessionEvent(event); } else { log.error("Failed to recreate persistent watch on ZooKeeper: {}", Code.get(rc)); - sessionWatcher.setSessionInvalid(); + sessionWatcher.ifPresent(ZKSessionWatcher::setSessionInvalid); // On the reconnectable client, mark the session as expired to trigger a new reconnect and // we will have the chance to set the watch again. if (zkc instanceof PulsarZooKeeperClient) { @@ -320,7 +325,9 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt if (isZkManaged) { zkc.close(); } - sessionWatcher.close(); + if (sessionWatcher.isPresent()) { + sessionWatcher.get().close(); + } super.close(); }