Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174874129 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + + public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor + .Builder().name("PULSAR_SERVICE_URL") + .displayName("Pulsar Service URL") + .description("URL for the Pulsar cluster, e.g localhost:6650") + .required(true) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .build(); + + public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() + .name("Maximum concurrent lookup-requests") + .description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " + + "overload on broker. (default: 5000) It should be configured with higher value only in case " + + "of it requires to produce/subscribe on thousands of topics") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("5000") + .build(); + + public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() + .name("Maximum connects per Pulsar broker") + .description("Sets the max number of connection that the client library will open to a single broker.\n" + + "By default, the connection pool will use a single connection for all the producers and consumers. " + + "Increasing this parameter may improve throughput when using many producers over a high latency connection") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + + public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() + .name("I/O Threads") + .description("The number of threads to be used for handling connections to brokers (default: 1 thread)") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + + public static final PropertyDescriptor LISTENER_THREADS = new PropertyDescriptor.Builder() + .name("Listener Threads") + .description("The number of threads to be used for message listeners (default: 1 thread)") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + + public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new PropertyDescriptor.Builder() + .name("Maximum rejected requests per connection") + .description("Max number of broker-rejected requests in a certain time-frame (30 seconds) after " + + "which current connection will be closed and client creates a new connection that give " + + "chance to connect a different broker (default: 50)") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("50") + .build(); + + public static final PropertyDescriptor OPERATION_TIMEOUT = new PropertyDescriptor.Builder() + .name("Operation Timeout") + .description("Producer-create, subscribe and unsubscribe operations will be retried until this " + + "interval, after which the operation will be maked as failed (default: 30 seconds)") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("30") + .build(); + + public static final PropertyDescriptor STATS_INTERVAL = new PropertyDescriptor.Builder() + .name("Stats interval") + .description("The interval between each stat info (default: 60 seconds) Stats will be activated " + + "with positive statsIntervalSeconds It should be set to at least 1 second") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("60") + .build(); + + public static final PropertyDescriptor USE_TCP_NO_DELAY = new PropertyDescriptor.Builder() + .name("Use TCP nodelay flag") + .description("Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.\n" + + "No-delay features make sure packets are sent out on the network as soon as possible, and it's critical " + + "to achieve low latency publishes. On the other hand, sending out a huge number of small packets might " + + "limit the overall throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay " + + "flag to false.") + .required(false) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("false") + .build(); + + public static final PropertyDescriptor MAX_PRODUCERS = new PropertyDescriptor + .Builder().name("MAX_PRODUCERS") + .displayName("Producer Pool Size") + .description("The Maximum Number of Pulsar Producers created by this Pulsar Client Pool") + .required(true) + .defaultValue("10") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_CONSUMERS = new PropertyDescriptor + .Builder().name("MAX_CONSUMERS") + .displayName("Consumer Pool Size") + .description("The Maximum Number of Pulsar consumers created by this Pulsar Client Pool") + .required(true) + .defaultValue("10") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("ssl.context.service") + .displayName("SSL Context Service") + .description("Specifies the SSL Context Service to use for communicating with Pulsar.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + private static final List<PropertyDescriptor> properties; + private volatile PulsarClient client; + + private volatile ResourcePoolImpl<PulsarProducer> producers; + private volatile ResourcePoolImpl<PulsarConsumer> consumers; + private ClientConfiguration clientConfig; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(PULSAR_SERVICE_URL); + props.add(MAX_CONSUMERS); + props.add(MAX_PRODUCERS); + props.add(CONCURRENT_LOOKUP_REQUESTS); + props.add(CONNECTIONS_PER_BROKER); + props.add(IO_THREADS); + props.add(LISTENER_THREADS); + props.add(MAXIMUM_REJECTED_REQUESTS); + props.add(OPERATION_TIMEOUT); + props.add(STATS_INTERVAL); + props.add(USE_TCP_NO_DELAY); + properties = Collections.unmodifiableList(props); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + /** + * @param context + * the configuration context + * @throws InitializationException + * if unable to create a database connection + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + + createClient(context); + + if (this.client == null) + throw new InitializationException("Unable to create Pulsar Client"); + + producers = new ResourcePoolImpl<PulsarProducer>(new PulsarProducerFactory(client), context.getProperty(MAX_PRODUCERS).asInteger()); + consumers = new ResourcePoolImpl<PulsarConsumer>(new PulsarConsumerFactory(client), context.getProperty(MAX_CONSUMERS).asInteger()); + + } + + private void createClient(final ConfigurationContext context) throws InitializationException { + + // We can't create a client without a service URL. + if (!context.getProperty(PULSAR_SERVICE_URL).isSet()) { + return; + } + + try { + this.client = PulsarClient.create(buildPulsarBrokerRootUrl(context.getProperty(PULSAR_SERVICE_URL).getValue(), + getClientConfig(context).isUseTls()), getClientConfig(context)); + + } catch (Exception e) { + throw new InitializationException("Unable to create Pulsar Client", e); + } + + } + + private static String buildPulsarBrokerRootUrl(String uri, boolean tlsEnabled) { + StringBuilder builder = new StringBuilder(); + builder.append("pulsar"); + + if (tlsEnabled) + builder.append("+ssl"); + + builder.append("://"); + builder.append(uri); + return builder.toString(); + } + + private ClientConfiguration getClientConfig(ConfigurationContext context) throws UnsupportedAuthenticationException { + + if (clientConfig == null) { + clientConfig = new ClientConfiguration(); + + if (context.getProperty(CONCURRENT_LOOKUP_REQUESTS).isSet()) { --- End diff -- I noticed above that most of these were set to be optional. Are you trying to let the API impose its own defaults if the user removes the default value you specified in each of these optionals? If that's not your intent, I would make them all required since you already provide a default value for each of them. Then you can get rid of all of these if statements.
---