315157973 commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r708930965



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws 
PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), 
null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool, Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), 
null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean 
createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider 
externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws 
PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || 
eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != 
(internalExecutorProvider == null)) {
+                throw new IllegalStateException("Both externalExecutorProvider 
and internalExecutorProvider must be specified or unspecified.");
+            }
+            this.createdExecutorProviders = externalExecutorProvider == null;
+            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup 
: getEventLoopGroup(conf);
+            this.eventLoopGroup = eventLoopGroupReference;
+            if (conf == null || isBlank(conf.getServiceUrl()) || 
this.eventLoopGroup == null) {
                 throw new 
PulsarClientException.InvalidConfigurationException("Invalid client 
configuration");
             }
-            this.eventLoopGroup = eventLoopGroup;
             setAuth(conf);
             this.conf = conf;
             clientClock = conf.getClock();
             conf.getAuthentication().start();
-            this.cnxPool = cnxPool;
-            externalExecutorProvider = new 
ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-            internalExecutorService = new 
ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            connectionPoolReference = connectionPool != null ? connectionPool 
: new ConnectionPool(conf, this.eventLoopGroup);
+            this.cnxPool = connectionPoolReference;
+            this.externalExecutorProvider = externalExecutorProvider != null ? 
externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), 
"pulsar-external-listener");
+            this.internalExecutorProvider = internalExecutorProvider != null ? 
internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), 
"pulsar-client-internal");

Review comment:
       OK, I get your point




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to