This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9c3f2a7 [conf] Add annotations for documenting broker configuration settings (#3113) 9c3f2a7 is described below commit 9c3f2a77bb352c073d2dc36094b42dc3c9918c3b Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Thu Dec 20 18:22:29 2018 +0800 [conf] Add annotations for documenting broker configuration settings (#3113) *Motivation* This change is adding annotations to broker configuration for generating broker configuration file. --- .../apache/pulsar/broker/ServiceConfiguration.java | 1036 +++++++++++++++----- .../configuration/PulsarConfigurationLoader.java | 7 +- 2 files changed, 784 insertions(+), 259 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 3939ceb..75ee5d2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -30,6 +30,7 @@ import lombok.Getter; import lombok.Setter; import org.apache.bookkeeper.client.api.DigestType; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; +import org.apache.pulsar.common.configuration.Category; import org.apache.pulsar.common.configuration.FieldContext; import org.apache.pulsar.common.configuration.PulsarConfiguration; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -41,484 +42,996 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; @Setter public class ServiceConfiguration implements PulsarConfiguration { + @Category + private static final String CATEGORY_SERVER = "Server"; + @Category + private static final String CATEGORY_STORAGE_BK = "Storage (BookKeeper)"; + @Category + private static final String CATEGORY_STORAGE_ML = "Storage (Managed Ledger)"; + @Category + private static final String CATEGORY_STORAGE_OFFLOADING = "Storage (Ledger Offloading)"; + @Category + private static final String CATEGORY_POLICIES = "Policies"; + @Category + private static final String CATEGORY_WEBSOCKET = "WebSocket"; + @Category + private static final String CATEGORY_SCHEMA = "Schema"; + @Category + private static final String CATEGORY_METRICS = "Metrics"; + @Category + private static final String CATEGORY_REPLICATION = "Replication"; + @Category + private static final String CATEGORY_LOAD_BALANCER = "Load Balancer"; + @Category + private static final String CATEGORY_FUNCTIONS = "Functions"; + @Category + private static final String CATEGORY_TLS = "TLS"; + @Category + private static final String CATEGORY_AUTHENTICATION = "Authentication"; + @Category + private static final String CATEGORY_AUTHORIZATION = "Authorization"; + @Category + private static final String CATEGORY_TOKEN_AUTH = "Token Authentication Provider"; + @Category + private static final String CATEGORY_HTTP = "HTTP"; + /***** --- pulsar configuration --- ****/ - // Zookeeper quorum connection string - @FieldContext(required = true) + @FieldContext( + category = CATEGORY_SERVER, + required = true, + doc = "The Zookeeper quorum connection string (as a comma-separated list)" + ) private String zookeeperServers; - // Global Zookeeper quorum connection string @Deprecated - @FieldContext(required = false) + @FieldContext( + category = CATEGORY_SERVER, + required = false, + deprecated = true, + doc = "Global Zookeeper quorum connection string (as a comma-separated list)." + + " Deprecated in favor of using `configurationStoreServers`" + ) private String globalZookeeperServers; - // Configuration Store connection string - @FieldContext(required = false) + @FieldContext( + category = CATEGORY_SERVER, + required = false, + doc = "Configuration store connection string (as a comma-separated list)" + ) private String configurationStoreServers; + @FieldContext( + category = CATEGORY_SERVER, + doc = "The port for serving binary protobuf requests" + ) private Integer brokerServicePort = 6650; + @FieldContext( + category = CATEGORY_SERVER, + doc = "The port for serving tls secured binary protobuf requests" + ) private Integer brokerServicePortTls = null; - // Port to use to server HTTP request + @FieldContext( + category = CATEGORY_SERVER, + doc = "The port for serving http requests" + ) private Integer webServicePort = 8080; - // Port to use to server HTTPS request + @FieldContext( + category = CATEGORY_SERVER, + doc = "The port for serving https requests" + ) private Integer webServicePortTls = null; - // Hostname or IP address the service binds on. + @FieldContext( + category = CATEGORY_SERVER, + doc = "Hostname or IP address the service binds on" + ) private String bindAddress = "0.0.0.0"; - // Controls which hostname is advertised to the discovery service via ZooKeeper. + @FieldContext( + category = CATEGORY_SERVER, + doc = "Hostname or IP address the service advertises to the outside world." + + " If not set, the value of `InetAddress.getLocalHost().getHostname()` is used." + ) private String advertisedAddress; - // Number of threads to use for Netty IO + @FieldContext( + category = CATEGORY_SERVER, + doc = "Number of threads to use for Netty IO." + + " Default is set to `2 * Runtime.getRuntime().availableProcessors()`" + ) private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors(); - // Enable the WebSocket API service + @FieldContext( + category = CATEGORY_WEBSOCKET, + doc = "Enable the WebSocket API service in broker" + ) private boolean webSocketServiceEnabled = false; - // Flag to control features that are meant to be used when running in standalone mode + @FieldContext( + category = CATEGORY_WEBSOCKET, + doc = "Flag indicates whether to run broker in standalone mode" + ) private boolean isRunningStandalone = false; - // Name of the cluster to which this broker belongs to - @FieldContext(required = true) + @FieldContext( + category = CATEGORY_SERVER, + required = true, + doc = "Name of the cluster to which this broker belongs to" + ) private String clusterName; - // Enable cluster's failure-domain which can distribute brokers into logical region - @FieldContext(dynamic = true) + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "Enable cluster's failure-domain which can distribute brokers into logical region" + ) private boolean failureDomainsEnabled = false; - // Zookeeper session timeout in milliseconds + @FieldContext( + category = CATEGORY_SERVER, + doc = "ZooKeeper session timeout in milliseconds" + ) private long zooKeeperSessionTimeoutMillis = 30000; - // Time to wait for broker graceful shutdown. After this time elapses, the - // process will be killed - @FieldContext(dynamic = true) + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "Time to wait for broker graceful shutdown. After this time elapses, the process will be killed" + ) private long brokerShutdownTimeoutMs = 60000; - // Enable backlog quota check. Enforces action on topic when the quota is - // reached + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Enable backlog quota check. Enforces actions on topic when the quota is reached" + ) private boolean backlogQuotaCheckEnabled = true; - // How often to check for topics that have reached the quota + @FieldContext( + category = CATEGORY_POLICIES, + doc = "How often to check for topics that have reached the quota." + + " It only takes effects when `backlogQuotaCheckEnabled` is true" + ) private int backlogQuotaCheckIntervalInSeconds = 60; - // Default per-topic backlog quota limit + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Default per-topic backlog quota limit. Increase it if you want to allow larger msg backlog" + ) private long backlogQuotaDefaultLimitGB = 50; - //Default backlog quota retention policy. Default is producer_request_hold - //'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out) - //'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer - //'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Default backlog quota retention policy. Default is producer_request_hold\n\n" + + "'producer_request_hold' Policy which holds producer's send request until the" + + "resource becomes available (or holding times out)\n" + + "'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer\n" + + "'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog" + ) private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy = BacklogQuota.RetentionPolicy.producer_request_hold; - // Enable the deletion of inactive topics + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Enable the deletion of inactive topics" + ) private boolean brokerDeleteInactiveTopicsEnabled = true; - // How often to check for inactive topics + @FieldContext( + category = CATEGORY_POLICIES, + doc = "How often to check for inactive topics" + ) private long brokerDeleteInactiveTopicsFrequencySeconds = 60; - // How frequently to proactively check and purge expired messages + @FieldContext( + category = CATEGORY_POLICIES, + doc = "How frequently to proactively check and purge expired messages" + ) private int messageExpiryCheckIntervalInMinutes = 5; - // How long to delay rewinding cursor and dispatching messages when active consumer is changed + @FieldContext( + category = CATEGORY_POLICIES, + doc = "How long to delay rewinding cursor and dispatching messages when active consumer is changed" + ) private int activeConsumerFailoverDelayTimeMillis = 1000; - // How long to delete inactive subscriptions from last consuming - // When it is 0, inactive subscriptions are not deleted automatically + @FieldContext( + category = CATEGORY_POLICIES, + doc = "How long to delete inactive subscriptions from last consuming." + + " When it is 0, inactive subscriptions are not deleted automatically" + ) private long subscriptionExpirationTimeMinutes = 0; - // How frequently to proactively check and purge expired subscription + @FieldContext( + category = CATEGORY_POLICIES, + doc = "How frequently to proactively check and purge expired subscription" + ) private long subscriptionExpiryCheckIntervalInMinutes = 5; - // Set the default behavior for message deduplication in the broker - // This can be overridden per-namespace. If enabled, broker will reject - // messages that were already stored in the topic + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Set the default behavior for message deduplication in the broker.\n\n" + + "This can be overridden per-namespace. If enabled, broker will reject" + + " messages that were already stored in the topic" + ) private boolean brokerDeduplicationEnabled = false; - // Maximum number of producer information that it's going to be - // persisted for deduplication purposes + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Maximum number of producer information that it's going to be persisted for deduplication purposes" + ) private int brokerDeduplicationMaxNumberOfProducers = 10000; - // Number of entries after which a dedup info snapshot is taken. - // A bigger interval will lead to less snapshots being taken though it would - // increase the topic recovery time, when the entries published after the - // snapshot need to be replayed + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Number of entries after which a dedup info snapshot is taken.\n\n" + + "A bigger interval will lead to less snapshots being taken though it would" + + " increase the topic recovery time, when the entries published after the" + + " snapshot need to be replayed" + ) private int brokerDeduplicationEntriesInterval = 1000; - // Time of inactivity after which the broker will discard the deduplication information - // relative to a disconnected producer. Default is 6 hours. + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Time of inactivity after which the broker will discard the deduplication information" + + " relative to a disconnected producer. Default is 6 hours.") private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360; - // When a namespace is created without specifying the number of bundle, this - // value will be used as the default + @FieldContext( + category = CATEGORY_POLICIES, + doc = "When a namespace is created without specifying the number of bundle, this" + + " value will be used as the default") private int defaultNumberOfNamespaceBundles = 4; - // Enable check for minimum allowed client library version - @FieldContext(dynamic = true) + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "Enable check for minimum allowed client library version" + ) private boolean clientLibraryVersionCheckEnabled = false; - // Path for the file used to determine the rotation status for the broker - // when responding to service discovery health checks + @FieldContext( + category = CATEGORY_SERVER, + doc = "Path for the file used to determine the rotation status for the broker" + + " when responding to service discovery health checks") private String statusFilePath; - // Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker - // will stop sending messages to consumer once, this limit reaches until consumer starts acknowledging messages back - // and unack count reaches to maxUnackedMessagesPerConsumer/2 Using a value of 0, is disabling unackedMessage-limit - // check and consumer can receive messages without any restriction + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Max number of unacknowledged messages allowed to receive messages by a consumer on" + + " a shared subscription.\n\n Broker will stop sending messages to consumer once," + + " this limit reaches until consumer starts acknowledging messages back and unack count" + + " reaches to `maxUnackedMessagesPerConsumer/2`. Using a value of 0, it is disabling " + + " unackedMessage-limit check and consumer can receive messages without any restriction") private int maxUnackedMessagesPerConsumer = 50000; - // Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to - // all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and - // unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit - // check and dispatcher can dispatch messages without any restriction + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Max number of unacknowledged messages allowed per shared subscription. \n\n" + + " Broker will stop dispatching messages to all consumers of the subscription once this " + + " limit reaches until consumer starts acknowledging messages back and unack count reaches" + + " to `limit/2`. Using a value of 0, is disabling unackedMessage-limit check and dispatcher" + + " can dispatch messages without any restriction") private int maxUnackedMessagesPerSubscription = 4 * 50000; - // Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching - // messages to all shared subscription which has higher number of unack messages until subscriptions start - // acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling - // unackedMessage-limit check and broker doesn't block dispatchers + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Max number of unacknowledged messages allowed per broker. \n\n" + + " Once this limit reaches, broker will stop dispatching messages to all shared subscription " + + " which has higher number of unack messages until subscriptions start acknowledging messages " + + " back and unack count reaches to `limit/2`. Using a value of 0, is disabling unackedMessage-limit" + + " check and broker doesn't block dispatchers") private int maxUnackedMessagesPerBroker = 0; - // Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages - // than this percentage limit and subscription will not receive any new messages until that subscription acks back - // limit/2 messages + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher " + + " unacked messages than this percentage limit and subscription will not receive any new messages " + + " until that subscription acks back `limit/2` messages") private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16; - // Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies, - // hence causing high network bandwidth usage - // When the positive value is set, broker will throttle the subscribe requests for one consumer. - // Otherwise, the throttling will be disabled. The default value of this setting is 0 - throttling is disabled. - @FieldContext(dynamic = true) + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Too many subscribe requests from a consumer can cause broker rewinding consumer cursors " + + " and loading data from bookies, hence causing high network bandwidth usage When the positive" + + " value is set, broker will throttle the subscribe requests for one consumer. Otherwise, the" + + " throttling will be disabled. The default value of this setting is 0 - throttling is disabled.") private int subscribeThrottlingRatePerConsumer = 0; - // Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s. - @FieldContext(minValue = 1, dynamic = true) + @FieldContext( + minValue = 1, + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s." + ) private int subscribeRatePeriodPerConsumerInSecond = 30; - // Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default - // message dispatch-throttling - @FieldContext(dynamic = true) + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Default number of message dispatching throttling-limit for every topic. \n\n" + + "Using a value of 0, is disabling default message dispatch-throttling") private int dispatchThrottlingRatePerTopicInMsg = 0; - // Default number of message-bytes dispatching throttling-limit for every topic. Using a value of 0, is disabling - // default message-byte dispatch-throttling - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n" + + "Using a value of 0, is disabling default message-byte dispatch-throttling") private long dispatchThrottlingRatePerTopicInByte = 0; - // Default number of message dispatching throttling-limit for a subscription. - // Using a value of 0, is disabling default message dispatch-throttling. - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Default number of message dispatching throttling-limit for a subscription. \n\n" + + "Using a value of 0, is disabling default message dispatch-throttling.") private int dispatchThrottlingRatePerSubscriptionInMsg = 0; - // Default number of message-bytes dispatching throttling-limit for a subscription. - // Using a value of 0, is disabling default message-byte dispatch-throttling. - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Default number of message-bytes dispatching throttling-limit for a subscription. \n\n" + + "Using a value of 0, is disabling default message-byte dispatch-throttling.") private long dispatchThrottlingRatePerSubscribeInByte = 0; - // Default dispatch-throttling is disabled for consumers which already caught-up with published messages and - // don't have backlog. This enables dispatch-throttling for non-backlog consumers as well. - @FieldContext(dynamic = true) + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Default dispatch-throttling is disabled for consumers which already caught-up with" + + " published messages and don't have backlog. This enables dispatch-throttling for " + + " non-backlog consumers as well.") private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false; - // Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = "Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic") private int maxConcurrentLookupRequest = 50000; - // Max number of concurrent topic loading request broker allows to control number of zk-operations - @FieldContext(dynamic = true) + + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = "Max number of concurrent topic loading request broker allows to control number of zk-operations" + ) private int maxConcurrentTopicLoadRequest = 5000; - // Max concurrent non-persistent message can be processed per connection + @FieldContext( + category = CATEGORY_SERVER, + doc = "Max concurrent non-persistent message can be processed per connection") private int maxConcurrentNonPersistentMessagePerConnection = 1000; - // Number of worker threads to serve non-persistent topic + @FieldContext( + category = CATEGORY_SERVER, + doc = "Number of worker threads to serve non-persistent topic") private int numWorkerThreadsForNonPersistentTopic = Runtime.getRuntime().availableProcessors();; - // Enable broker to load persistent topics + @FieldContext( + category = CATEGORY_SERVER, + doc = "Enable broker to load persistent topics" + ) private boolean enablePersistentTopics = true; - // Enable broker to load non-persistent topics + @FieldContext( + category = CATEGORY_SERVER, + doc = "Enable broker to load non-persistent topics" + ) private boolean enableNonPersistentTopics = true; - // Enable to run bookie along with broker + @FieldContext( + category = CATEGORY_SERVER, + doc = "Enable to run bookie along with broker" + ) private boolean enableRunBookieTogether = false; - // Enable to run bookie autorecovery along with broker + @FieldContext( + category = CATEGORY_SERVER, + doc = "Enable to run bookie autorecovery along with broker" + ) private boolean enableRunBookieAutoRecoveryTogether = false; - // Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers - // until the number of connected producers decrease. - // Using a value of 0, is disabling maxProducersPerTopic-limit check. + @FieldContext( + category = CATEGORY_SERVER, + doc = "Max number of producers allowed to connect to topic. \n\nOnce this limit reaches," + + " Broker will reject new producers until the number of connected producers decrease." + + " Using a value of 0, is disabling maxProducersPerTopic-limit check.") private int maxProducersPerTopic = 0; - // Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers - // until the number of connected consumers decrease. - // Using a value of 0, is disabling maxConsumersPerTopic-limit check. + @FieldContext( + category = CATEGORY_SERVER, + doc = "Max number of consumers allowed to connect to topic. \n\nOnce this limit reaches," + + " Broker will reject new consumers until the number of connected consumers decrease." + + " Using a value of 0, is disabling maxConsumersPerTopic-limit check.") private int maxConsumersPerTopic = 0; - // Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers - // until the number of connected consumers decrease. - // Using a value of 0, is disabling maxConsumersPerSubscription-limit check. + @FieldContext( + category = CATEGORY_SERVER, + doc = "Max number of consumers allowed to connect to subscription. \n\nOnce this limit reaches," + + " Broker will reject new consumers until the number of connected consumers decrease." + + " Using a value of 0, is disabling maxConsumersPerSubscription-limit check.") private int maxConsumersPerSubscription = 0; /***** --- TLS --- ****/ + @FieldContext( + category = CATEGORY_TLS, + doc = "Enable TLS" + ) @Deprecated private boolean tlsEnabled = false; - // Path for the TLS certificate file + @FieldContext( + category = CATEGORY_TLS, + doc = "Path for the TLS certificate file" + ) private String tlsCertificateFilePath; - // Path for the TLS private key file + @FieldContext( + category = CATEGORY_TLS, + doc = "Path for the TLS private key file" + ) private String tlsKeyFilePath; - // Path for the trusted TLS certificate file + @FieldContext( + category = CATEGORY_TLS, + doc = "Path for the trusted TLS certificate file" + ) private String tlsTrustCertsFilePath = ""; - // Accept untrusted TLS certificate from client + @FieldContext( + category = CATEGORY_TLS, + doc = "Accept untrusted TLS certificate from client" + ) private boolean tlsAllowInsecureConnection = false; - // Specify the tls protocols the broker will use to negotiate during TLS Handshake. - // Example:- [TLSv1.2, TLSv1.1, TLSv1] + @FieldContext( + category = CATEGORY_TLS, + doc = "Specify the tls protocols the broker will use to negotiate during TLS Handshake.\n\n" + + "Example:- [TLSv1.2, TLSv1.1, TLSv1]" + ) private Set<String> tlsProtocols = Sets.newTreeSet(); - // Specify the tls cipher the broker will use to negotiate during TLS Handshake. - // Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256] + @FieldContext( + category = CATEGORY_TLS, + doc = "Specify the tls cipher the broker will use to negotiate during TLS Handshake.\n\n" + + "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]" + ) private Set<String> tlsCiphers = Sets.newTreeSet(); - // Specify whether Client certificates are required for TLS - // Reject the Connection if the Client Certificate is not trusted. + @FieldContext( + category = CATEGORY_TLS, + doc = "Specify whether Client certificates are required for TLS Reject.\n" + + "the Connection if the Client Certificate is not trusted") private boolean tlsRequireTrustedClientCertOnConnect = false; /***** --- Authentication --- ****/ - // Enable authentication + @FieldContext( + category = CATEGORY_AUTHENTICATION, + doc = "Enable authentication" + ) private boolean authenticationEnabled = false; - // Autentication provider name list, which is a list of class names + @FieldContext( + category = CATEGORY_AUTHENTICATION, + doc = "Autentication provider name list, which is a list of class names" + ) private Set<String> authenticationProviders = Sets.newTreeSet(); - // Enforce authorization + @FieldContext( + category = CATEGORY_AUTHORIZATION, + doc = "Enforce authorization" + ) private boolean authorizationEnabled = false; - // Authorization provider fully qualified class-name + @FieldContext( + category = CATEGORY_AUTHORIZATION, + doc = "Authorization provider fully qualified class-name" + ) private String authorizationProvider = PulsarAuthorizationProvider.class.getName(); - // Role names that are treated as "super-user", meaning they will be able to - // do all admin operations and publish/consume from all topics + @FieldContext( + category = CATEGORY_AUTHORIZATION, + doc = "Role names that are treated as `super-user`, meaning they will be able to" + + " do all admin operations and publish/consume from all topics" + ) private Set<String> superUserRoles = Sets.newTreeSet(); - // Role names that are treated as "proxy roles". If the broker sees a request with - // role as proxyRoles - it will demand to see the original client role or certificate. + @FieldContext( + category = CATEGORY_AUTHORIZATION, + doc = "Role names that are treated as `proxy roles`. \n\nIf the broker sees" + + " a request with role as proxyRoles - it will demand to see the original" + + " client role or certificate.") private Set<String> proxyRoles = Sets.newTreeSet(); - // If this flag is set then the broker authenticates the original Auth data - // else it just accepts the originalPrincipal and authorizes it (if required). + @FieldContext( + category = CATEGORY_AUTHORIZATION, + doc = "If this flag is set then the broker authenticates the original Auth data" + + " else it just accepts the originalPrincipal and authorizes it (if required)") private boolean authenticateOriginalAuthData = false; - // Allow wildcard matching in authorization - // (wildcard matching only applicable if wildcard-char: - // * presents at first or last position eg: *.pulsar.service, pulsar.service.*) + @FieldContext( + category = CATEGORY_AUTHORIZATION, + doc = "Allow wildcard matching in authorization\n\n" + + "(wildcard matching only applicable if wildcard-char: * presents at first" + + " or last position eg: *.pulsar.service, pulsar.service.*)") private boolean authorizationAllowWildcardsMatching = false; - // Authentication settings of the broker itself. Used when the broker connects - // to other brokers, either in same or other clusters. Default uses plugin which disables authentication + @FieldContext( + category = CATEGORY_AUTHENTICATION, + doc = "Authentication settings of the broker itself. \n\nUsed when the broker connects" + + " to other brokers, either in same or other clusters. Default uses plugin which disables authentication" + ) private String brokerClientAuthenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationDisabled"; + @FieldContext( + category = CATEGORY_AUTHENTICATION, + doc = "Authentication parameters of the authentication plugin the broker is using to connect to other brokers" + ) private String brokerClientAuthenticationParameters = ""; - // Path for the trusted TLS certificate file for outgoing connection to a server (broker) + @FieldContext( + category = CATEGORY_AUTHENTICATION, + doc = "Path for the trusted TLS certificate file for outgoing connection to a server (broker)") private String brokerClientTrustCertsFilePath = ""; - // When this parameter is not empty, unauthenticated users perform as anonymousUserRole + @FieldContext( + category = CATEGORY_AUTHORIZATION, + doc = "When this parameter is not empty, unauthenticated users perform as anonymousUserRole" + ) private String anonymousUserRole = null; /**** --- BookKeeper Client --- ****/ - // Authentication plugin to use when connecting to bookies + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Authentication plugin to use when connecting to bookies" + ) private String bookkeeperClientAuthenticationPlugin; - // BookKeeper auth plugin implementatation specifics parameters name and values + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "BookKeeper auth plugin implementatation specifics parameters name and values" + ) private String bookkeeperClientAuthenticationParametersName; + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Parameters for bookkeeper auth plugin" + ) private String bookkeeperClientAuthenticationParameters; - // Timeout for BK add / read operations + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Timeout for BK add / read operations" + ) private long bookkeeperClientTimeoutInSeconds = 30; - // Speculative reads are initiated if a read request doesn't complete within - // a certain time Using a value of 0, is disabling the speculative reads + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Speculative reads are initiated if a read request doesn't complete within" + + " a certain time Using a value of 0, is disabling the speculative reads") private int bookkeeperClientSpeculativeReadTimeoutInMillis = 0; - // Use older Bookkeeper wire protocol with bookie + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Use older Bookkeeper wire protocol with bookie" + ) private boolean bookkeeperUseV2WireProtocol = true; - // Enable bookies health check. Bookies that have more than the configured - // number of failure within the interval will be quarantined for some time. - // During this period, new ledgers won't be created on these bookies + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Enable bookies health check. \n\n Bookies that have more than the configured" + + " number of failure within the interval will be quarantined for some time." + + " During this period, new ledgers won't be created on these bookies") private boolean bookkeeperClientHealthCheckEnabled = true; + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Bookies health check interval in seconds" + ) private long bookkeeperClientHealthCheckIntervalSeconds = 60; + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Bookies health check error threshold per check interval" + ) private long bookkeeperClientHealthCheckErrorThresholdPerInterval = 5; + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Bookie health check quarantined time in seconds" + ) private long bookkeeperClientHealthCheckQuarantineTimeInSeconds = 1800; - // Enable rack-aware bookie selection policy. BK will chose bookies from - // different racks when forming a new bookie ensemble + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Enable rack-aware bookie selection policy. \n\nBK will chose bookies from" + + " different racks when forming a new bookie ensemble") private boolean bookkeeperClientRackawarePolicyEnabled = true; - // Enable region-aware bookie selection policy. BK will chose bookies from - // different regions and racks when forming a new bookie ensemble + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Enable region-aware bookie selection policy. \n\nBK will chose bookies from" + + " different regions and racks when forming a new bookie ensemble") private boolean bookkeeperClientRegionawarePolicyEnabled = false; - // Enable/disable reordering read sequence on reading entries. + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Enable/disable reordering read sequence on reading entries") private boolean bookkeeperClientReorderReadSequenceEnabled = false; - // Enable bookie isolation by specifying a list of bookie groups to choose - // from. Any bookie outside the specified groups will not be used by the - // broker - @FieldContext(required = false) + @FieldContext( + category = CATEGORY_STORAGE_BK, + required = false, + doc = "Enable bookie isolation by specifying a list of bookie groups to choose from. \n\n" + + "Any bookie outside the specified groups will not be used by the broker") private String bookkeeperClientIsolationGroups; /**** --- Managed Ledger --- ****/ - // Number of bookies to use when creating a ledger - @FieldContext(minValue = 1) + @FieldContext( + minValue = 1, + category = CATEGORY_STORAGE_ML, + doc = "Number of bookies to use when creating a ledger" + ) private int managedLedgerDefaultEnsembleSize = 1; - // Number of copies to store for each message - @FieldContext(minValue = 1) + @FieldContext( + minValue = 1, + category = CATEGORY_STORAGE_ML, + doc = "Number of copies to store for each message" + ) private int managedLedgerDefaultWriteQuorum = 1; - // Number of guaranteed copies (acks to wait before write is complete) - @FieldContext(minValue = 1) + @FieldContext( + minValue = 1, + category = CATEGORY_STORAGE_ML, + doc = "Number of guaranteed copies (acks to wait before write is complete)" + ) private int managedLedgerDefaultAckQuorum = 1; - // Default type of checksum to use when writing to BookKeeper. Default is "CRC32C" - // Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum). + // + // + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Default type of checksum to use when writing to BookKeeper. \n\nDefault is `CRC32C`." + + " Other possible options are `CRC32`, `MAC` or `DUMMY` (no checksum)." + ) private DigestType managedLedgerDigestType = DigestType.CRC32C; - // Max number of bookies to use when creating a ledger - @FieldContext(minValue = 1) + @FieldContext( + minValue = 1, + category = CATEGORY_STORAGE_ML, + doc = "Max number of bookies to use when creating a ledger" + ) private int managedLedgerMaxEnsembleSize = 5; - // Max number of copies to store for each message - @FieldContext(minValue = 1) + @FieldContext( + minValue = 1, + category = CATEGORY_STORAGE_ML, + doc = "Max number of copies to store for each message" + ) private int managedLedgerMaxWriteQuorum = 5; - // Max number of guaranteed copies (acks to wait before write is complete) - @FieldContext(minValue = 1) + @FieldContext( + minValue = 1, + category = CATEGORY_STORAGE_ML, + doc = "Max number of guaranteed copies (acks to wait before write is complete)" + ) private int managedLedgerMaxAckQuorum = 5; - // Amount of memory to use for caching data payload in managed ledger. This - // memory - // is allocated from JVM direct memory and it's shared across all the topics - // running in the same broker + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis" + + " memory is allocated from JVM direct memory and it's shared across all the topics" + + " running in the same broker") private int managedLedgerCacheSizeMB = 1024; - // Threshold to which bring down the cache level when eviction is triggered + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Threshold to which bring down the cache level when eviction is triggered" + ) private double managedLedgerCacheEvictionWatermark = 0.9f; - // Rate limit the amount of writes per second generated by consumer acking the messages + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Rate limit the amount of writes per second generated by consumer acking the messages" + ) private double managedLedgerDefaultMarkDeleteRateLimit = 1.0; - // Number of threads to be used for managed ledger tasks dispatching + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Number of threads to be used for managed ledger tasks dispatching" + ) private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors(); - // Number of threads to be used for managed ledger scheduled tasks + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Number of threads to be used for managed ledger scheduled tasks" + ) private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors(); - // Max number of entries to append to a ledger before triggering a rollover - // A ledger rollover is triggered on these conditions Either the max - // rollover time has been reached or max entries have been written to the - // ledged and at least min-time has passed + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Max number of entries to append to a ledger before triggering a rollover.\n\n" + + "A ledger rollover is triggered on these conditions Either the max" + + " rollover time has been reached or max entries have been written to the" + + " ledged and at least min-time has passed") private int managedLedgerMaxEntriesPerLedger = 50000; - // Minimum time between ledger rollover for a topic + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Minimum time between ledger rollover for a topic" + ) private int managedLedgerMinLedgerRolloverTimeMinutes = 10; - // Maximum time before forcing a ledger rollover for a topic + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Maximum time before forcing a ledger rollover for a topic" + ) private int managedLedgerMaxLedgerRolloverTimeMinutes = 240; - // Delay between a ledger being successfully offloaded to long term storage - // and the ledger being deleted from bookkeeper + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Delay between a ledger being successfully offloaded to long term storage," + + " and the ledger being deleted from bookkeeper" + ) private long managedLedgerOffloadDeletionLagMs = TimeUnit.HOURS.toMillis(4); - // Max number of entries to append to a cursor ledger + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Max number of entries to append to a cursor ledger" + ) private int managedLedgerCursorMaxEntriesPerLedger = 50000; - // Max time before triggering a rollover on a cursor ledger + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Max time before triggering a rollover on a cursor ledger" + ) private int managedLedgerCursorRolloverTimeInSeconds = 14400; - // Max number of "acknowledgment holes" that are going to be persistently stored. - // When acknowledging out of order, a consumer will leave holes that are supposed - // to be quickly filled by acking all the messages. The information of which - // messages are acknowledged is persisted by compressing in "ranges" of messages - // that were acknowledged. After the max number of ranges is reached, the information - // will only be tracked in memory and messages will be redelivered in case of - // crashes. + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Max number of `acknowledgment holes` that are going to be persistently stored.\n\n" + + "When acknowledging out of order, a consumer will leave holes that are supposed" + + " to be quickly filled by acking all the messages. The information of which" + + " messages are acknowledged is persisted by compressing in `ranges` of messages" + + " that were acknowledged. After the max number of ranges is reached, the information" + + " will only be tracked in memory and messages will be redelivered in case of" + + " crashes.") private int managedLedgerMaxUnackedRangesToPersist = 10000; - // Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher - // than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into - // zookeeper. + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Max number of `acknowledgment holes` that can be stored in Zookeeper.\n\n" + + "If number of unack message range is higher than this limit then broker will persist" + + " unacked ranges into bookkeeper to avoid additional data overhead into zookeeper.") private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000; - // Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets - // corrupted at bookkeeper and managed-cursor is stuck at that ledger. - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_STORAGE_ML, + doc = "Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.\n\n" + + " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger." + ) private boolean autoSkipNonRecoverableData = false; - // operation timeout while updating managed-ledger metadata. + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "operation timeout while updating managed-ledger metadata." + ) private long managedLedgerMetadataOperationsTimeoutSeconds = 60; /*** --- Load balancer --- ****/ - // Enable load balancer + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "Enable load balancer" + ) private boolean loadBalancerEnabled = true; - // load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl) @Deprecated + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + deprecated = true, + doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)" + ) private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection - // Percentage of change to trigger load report update - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Percentage of change to trigger load report update" + ) private int loadBalancerReportUpdateThresholdPercentage = 10; - // maximum interval to update load report - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "maximum interval to update load report" + ) private int loadBalancerReportUpdateMaxIntervalMinutes = 15; - // Frequency of report to collect + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "Frequency of report to collect, in minutes" + ) private int loadBalancerHostUsageCheckIntervalMinutes = 1; - // Enable/disable automatic bundle unloading for load-shedding - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Enable/disable automatic bundle unloading for load-shedding" + ) private boolean loadBalancerSheddingEnabled = true; - // Load shedding interval. Broker periodically checks whether some traffic should be offload from some over-loaded - // broker to other under-loaded brokers + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "Load shedding interval. \n\nBroker periodically checks whether some traffic" + + " should be offload from some over-loaded broker to other under-loaded brokers" + ) private int loadBalancerSheddingIntervalMinutes = 1; - // Prevent the same topics to be shed and moved to other broker more that - // once within this timeframe + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "Prevent the same topics to be shed and moved to other broker more that" + + " once within this timeframe" + ) private long loadBalancerSheddingGracePeriodMinutes = 30; - // Usage threshold to determine a broker as under-loaded (only used by SimpleLoadManagerImpl) + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + deprecated = true, + doc = "Usage threshold to determine a broker as under-loaded (only used by SimpleLoadManagerImpl)" + ) @Deprecated private int loadBalancerBrokerUnderloadedThresholdPercentage = 50; - // Usage threshold to allocate max number of topics to broker - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Usage threshold to allocate max number of topics to broker" + ) private int loadBalancerBrokerMaxTopics = 50000; - // Usage threshold to determine a broker as over-loaded - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Usage threshold to determine a broker as over-loaded" + ) private int loadBalancerBrokerOverloadedThresholdPercentage = 85; - // Interval to flush dynamic resource quota to ZooKeeper + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "Interval to flush dynamic resource quota to ZooKeeper" + ) private int loadBalancerResourceQuotaUpdateIntervalMinutes = 15; - // Usage threshold to determine a broker is having just right level of load (only used by SimpleLoadManagerImpl) @Deprecated + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + deprecated = true, + doc = "Usage threshold to determine a broker is having just right level of load" + + " (only used by SimpleLoadManagerImpl)" + ) private int loadBalancerBrokerComfortLoadLevelPercentage = 65; - // enable/disable automatic namespace bundle split - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "enable/disable automatic namespace bundle split" + ) private boolean loadBalancerAutoBundleSplitEnabled = true; - // enable/disable automatic unloading of split bundles - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "enable/disable automatic unloading of split bundles" + ) private boolean loadBalancerAutoUnloadSplitBundlesEnabled = true; - // maximum topics in a bundle, otherwise bundle split will be triggered + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "maximum topics in a bundle, otherwise bundle split will be triggered" + ) private int loadBalancerNamespaceBundleMaxTopics = 1000; - // maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered" + ) private int loadBalancerNamespaceBundleMaxSessions = 1000; - // maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered" + ) private int loadBalancerNamespaceBundleMaxMsgRate = 30000; - // maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered" + ) private int loadBalancerNamespaceBundleMaxBandwidthMbytes = 100; - // maximum number of bundles in a namespace + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "maximum number of bundles in a namespace" + ) private int loadBalancerNamespaceMaximumBundles = 128; - // Name of load manager to use - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Name of load manager to use" + ) private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"; - // Option to override the auto-detected network interfaces max speed + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + doc = "Option to override the auto-detected network interfaces max speed" + ) private Double loadBalancerOverrideBrokerNicSpeedGbps; /**** --- Replication --- ****/ - // Enable replication metrics + @FieldContext( + category = CATEGORY_REPLICATION, + doc = "Enable replication metrics" + ) private boolean replicationMetricsEnabled = false; - // Max number of connections to open for each broker in a remote cluster - // More connections host-to-host lead to better throughput over high-latency - // links. + @FieldContext( + category = CATEGORY_REPLICATION, + doc = "Max number of connections to open for each broker in a remote cluster.\n\n" + + "More connections host-to-host lead to better throughput over high-latency links" + ) private int replicationConnectionsPerBroker = 16; - @FieldContext(required = false) - // replicator prefix used for replicator producer name and cursor name + @FieldContext( + required = false, + category = CATEGORY_REPLICATION, + doc = "replicator prefix used for replicator producer name and cursor name" + ) private String replicatorPrefix = "pulsar.repl"; - // Replicator producer queue size; + @FieldContext( + category = CATEGORY_REPLICATION, + doc = "Replicator producer queue size" + ) private int replicationProducerQueueSize = 1000; - // @deprecated - Use brokerClientTlsEnabled instead. @Deprecated + @FieldContext( + category = CATEGORY_REPLICATION, + deprecated = true, + doc = "@deprecated - Use brokerClientTlsEnabled instead." + ) private boolean replicationTlsEnabled = false; - // Enable TLS when talking with other brokers in the same cluster (admin operation) or different clusters (replication) + @FieldContext( + category = CATEGORY_REPLICATION, + doc = "Enable TLS when talking with other brokers in the same cluster (admin operation)" + + " or different clusters (replication)" + ) private boolean brokerClientTlsEnabled = false; - // Default message retention time + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Default message retention time" + ) private int defaultRetentionTimeInMinutes = 0; - // Default retention size + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Default retention size" + ) private int defaultRetentionSizeInMB = 0; - // How often to check pulsar connection is still alive + @FieldContext( + category = CATEGORY_SERVER, + doc = "How often to check pulsar connection is still alive" + ) private int keepAliveIntervalSeconds = 30; - // How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected) + @FieldContext( + category = CATEGORY_POLICIES, + doc = "How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)" + ) private int brokerServicePurgeInactiveFrequencyInSeconds = 60; + @FieldContext( + category = CATEGORY_SERVER, + doc = "A comma-separated list of namespaces to bootstrap" + ) private List<String> bootstrapNamespaces = new ArrayList<String>(); private Properties properties = new Properties(); - // If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to - // use only brokers running the latest software version (to minimize impact to bundles) - @FieldContext(dynamic = true) + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = "If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to " + + "use only brokers running the latest software version (to minimize impact to bundles)" + ) private boolean preferLaterVersions = false; - // Interval between checks to see if topics with compaction policies need to be compacted + @FieldContext( + category = CATEGORY_SERVER, + doc = "Interval between checks to see if topics with compaction policies need to be compacted" + ) private int brokerServiceCompactionMonitorIntervalInSeconds = 60; + @FieldContext( + category = CATEGORY_SCHEMA, + doc = "Enforce schema validation on following cases:\n\n" + + " - if a producer without a schema attempts to produce to a topic with schema, the producer will be\n" + + " failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema.\n" + + " if you enable this setting, it will cause non-java clients failed to produce." + ) private boolean isSchemaValidationEnforced = false; + @FieldContext( + category = CATEGORY_SCHEMA, + doc = "The schema storage implementation used by this broker" + ) private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"; + @FieldContext( + category = CATEGORY_SCHEMA, + doc = "The list compatibility checkers to be used in schema registry" + ) private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet( "org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck", "org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck" ); /**** --- WebSocket --- ****/ - // Number of IO threads in Pulsar Client used in WebSocket proxy + @FieldContext( + category = CATEGORY_WEBSOCKET, + doc = "Number of IO threads in Pulsar Client used in WebSocket proxy" + ) private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors(); - // Number of connections per Broker in Pulsar Client used in WebSocket proxy + @FieldContext( + category = CATEGORY_WEBSOCKET, + doc = "Number of connections per Broker in Pulsar Client used in WebSocket proxy" + ) private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors(); - // Time in milliseconds that idle WebSocket session times out + @FieldContext( + category = CATEGORY_WEBSOCKET, + doc = "Time in milliseconds that idle WebSocket session times out" + ) private int webSocketSessionIdleTimeoutMillis = 300000; /**** --- Metrics --- ****/ - // If true, export topic level metrics otherwise namespace level + @FieldContext( + category = CATEGORY_METRICS, + doc = "If true, export topic level metrics otherwise namespace level" + ) private boolean exposeTopicLevelMetricsInPrometheus = true; + @FieldContext( + category = CATEGORY_METRICS, + doc = "If true, export consumer level metrics otherwise namespace level" + ) private boolean exposeConsumerLevelMetricsInPrometheus = false; /**** --- Functions --- ****/ + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Flag indicates enabling or disabling function worker on brokers" + ) private boolean functionsWorkerEnabled = false; /**** --- Broker Web Stats --- ****/ - // If true, export publisher stats when returning topics stats from the admin rest api + @FieldContext( + category = CATEGORY_METRICS, + doc = "If true, export publisher stats when returning topics stats from the admin rest api" + ) private boolean exposePublisherStats = true; + @FieldContext( + category = CATEGORY_METRICS, + doc = "Stats update frequency in seconds" + ) private int statsUpdateFrequencyInSecs = 60; + @FieldContext( + category = CATEGORY_METRICS, + doc = "Stats update initial delay in seconds" + ) private int statsUpdateInitialDelayInSecs = 60; /**** --- Ledger Offloading --- ****/ @@ -526,13 +1039,22 @@ public class ServiceConfiguration implements PulsarConfiguration { * NOTES: all implementation related settings should be put in implementation package. * only common settings like driver name, io threads can be added here. ****/ - // The directory to locate offloaders + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "The directory to locate offloaders" + ) private String offloadersDirectory = "./offloaders"; - // Driver to use to offload old data to long term storage + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Driver to use to offload old data to long term storage" + ) private String managedLedgerOffloadDriver = null; - // Maximum number of thread pool threads for ledger offloading + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Maximum number of thread pool threads for ledger offloading" + ) private int managedLedgerOffloadMaxThreads = 2; /** diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java index df0a1ec..aad9a2f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java @@ -25,6 +25,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.Arrays; import java.util.Map; import java.util.Properties; @@ -180,8 +181,10 @@ public class PulsarConfigurationLoader { try { Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName()); confField.setAccessible(true); - convertedConfField.setAccessible(true); - convertedConfField.set(convertedConf, confField.get(conf)); + if (!Modifier.isStatic(convertedConfField.getModifiers())) { + convertedConfField.setAccessible(true); + convertedConfField.set(convertedConf, confField.get(conf)); + } } catch (NoSuchFieldException e) { if (!ignoreNonExistMember) { throw new IllegalArgumentException("Exception caused while converting configuration: " + e.getMessage());