This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 71d6e6b7c2d [refactor][broker]refactor reflection method in storage
and transaction module (#17346)
71d6e6b7c2d is described below
commit 71d6e6b7c2d7babe3cfb9c87d9f6d1cb59b83624
Author: Qiang Huang <[email protected]>
AuthorDate: Thu Sep 1 10:37:32 2022 +0800
[refactor][broker]refactor reflection method in storage and transaction
module (#17346)
---
.../main/java/org/apache/pulsar/broker/PulsarService.java | 10 +++++-----
.../apache/pulsar/broker/storage/ManagedLedgerStorage.java | 6 ++++--
.../transaction/buffer/TransactionBufferProvider.java | 13 ++++---------
.../pendingack/TransactionPendingAckStoreProvider.java | 14 +++++---------
4 files changed, 18 insertions(+), 25 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 1c3b6b8aea1..00f7f9148d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -35,7 +35,6 @@ import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.time.Duration;
@@ -108,6 +107,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import
org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
@@ -144,6 +144,7 @@ import
org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.util.ThreadDumpUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
@@ -1369,10 +1370,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
private SchemaStorage createAndStartSchemaStorage() throws Exception {
- final Class<?> storageClass =
Class.forName(config.getSchemaRegistryStorageClassName());
- Object factoryInstance =
storageClass.getDeclaredConstructor().newInstance();
- Method createMethod = storageClass.getMethod("create",
PulsarService.class);
- SchemaStorage schemaStorage = (SchemaStorage)
createMethod.invoke(factoryInstance, this);
+ SchemaStorageFactory factoryInstance =
Reflections.createInstance(config.getSchemaRegistryStorageClassName(),
+ SchemaStorageFactory.class,
Thread.currentThread().getContextClassLoader());
+ SchemaStorage schemaStorage = factoryInstance.create(this);
schemaStorage.start();
return schemaStorage;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
index ac268f80531..efda2261f39 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.classification.InterfaceAudience.Private;
import org.apache.pulsar.common.classification.InterfaceStability.Unstable;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
/**
@@ -87,8 +88,9 @@ public interface ManagedLedgerStorage extends AutoCloseable {
MetadataStoreExtended metadataStore,
BookKeeperClientFactory bkProvider,
EventLoopGroup eventLoopGroup) throws
Exception {
- final Class<?> storageClass =
Class.forName(conf.getManagedLedgerStorageClassName());
- final ManagedLedgerStorage storage = (ManagedLedgerStorage)
storageClass.getDeclaredConstructor().newInstance();
+ ManagedLedgerStorage storage =
+
Reflections.createInstance(conf.getManagedLedgerStorageClassName(),
ManagedLedgerStorage.class,
+ Thread.currentThread().getContextClassLoader());
storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup);
return storage;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
index f167d51ef54..0141ae45b33 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
-import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.Beta;
import java.io.IOException;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.util.Reflections;
/**
* A provider that provides {@link TransactionBuffer}.
@@ -36,15 +36,10 @@ public interface TransactionBufferProvider {
* @return an instance of transaction buffer provider.
*/
static TransactionBufferProvider newProvider(String providerClassName)
throws IOException {
- Class<?> providerClass;
try {
- providerClass = Class.forName(providerClassName);
- Object obj = providerClass.getDeclaredConstructor().newInstance();
- checkArgument(obj instanceof TransactionBufferProvider,
- "The factory has to be an instance of "
- + TransactionBufferProvider.class.getName());
-
- return (TransactionBufferProvider) obj;
+ TransactionBufferProvider transactionBufferProvider =
Reflections.createInstance(providerClassName,
+ TransactionBufferProvider.class,
Thread.currentThread().getContextClassLoader());
+ return transactionBufferProvider;
} catch (Exception e) {
throw new IOException(e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java
index 27c8d20e0b3..fae38ee6a8e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.broker.transaction.pendingack;
-import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.common.util.Reflections;
/**
* Provider of transaction pending ack store.
@@ -35,15 +35,11 @@ public interface TransactionPendingAckStoreProvider {
* @return an instance of transaction buffer provider.
*/
static TransactionPendingAckStoreProvider newProvider(String
providerClassName) throws IOException {
- Class<?> providerClass;
try {
- providerClass = Class.forName(providerClassName);
- Object obj = providerClass.getDeclaredConstructor().newInstance();
- checkArgument(obj instanceof TransactionPendingAckStoreProvider,
- "The factory has to be an instance of "
- +
TransactionPendingAckStoreProvider.class.getName());
-
- return (TransactionPendingAckStoreProvider) obj;
+ TransactionPendingAckStoreProvider ackStoreProvider =
Reflections.createInstance(providerClassName,
+ TransactionPendingAckStoreProvider.class,
+ Thread.currentThread().getContextClassLoader());
+ return ackStoreProvider;
} catch (Exception e) {
throw new IOException(e);
}