http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java new file mode 100644 index 0000000..0cb608f --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java @@ -0,0 +1,3528 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import org.apache.distributedlog.bk.QuorumConfig; +import org.apache.distributedlog.feature.DefaultFeatureProvider; +import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.net.DNSResolverForRacks; +import org.apache.distributedlog.net.DNSResolverForRows; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.net.DNSToSwitchMapping; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.configuration.SystemConfiguration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; + +/** + * DistributedLog Configuration. + * <p> + * DistributedLog configuration is basically a properties based configuration, which extends from + * Apache commons {@link CompositeConfiguration}. All the DL settings are in camel case and prefixed + * with a meaningful component name. for example, `zkSessionTimeoutSeconds` means <i>SessionTimeoutSeconds</i> + * for component `zk`. + * + * <h3>BookKeeper Configuration</h3> + * + * BookKeeper client configuration settings could be loaded via DistributedLog configuration. All those + * settings are prefixed with <i>`bkc.`</i>. For example, <i>bkc.zkTimeout</i> in distributedlog configuration + * will be applied as <i>`zkTimeout`</i> in bookkeeper client configuration. + * + * <h3>How to load configuration</h3> + * + * The default distributedlog configuration is constructed by instantiated a new instance. This + * distributedlog configuration will automatically load the settings that specified via + * {@link SystemConfiguration}. + * + * <pre> + * DistributedLogConfiguration conf = new DistributedLogConfiguration(); + * </pre> + * + * The recommended way is to load configuration from URL that points to a configuration file + * ({@link #loadConf(URL)}). + * + * <pre> + * String configFile = "/path/to/distributedlog/conf/file"; + * DistributedLogConfiguration conf = new DistributedLogConfiguration(); + * conf.loadConf(new File(configFile).toURI().toURL()); + * </pre> + * + * @see org.apache.bookkeeper.conf.ClientConfiguration + */ +public class DistributedLogConfiguration extends CompositeConfiguration { + static final Logger LOG = LoggerFactory.getLogger(DistributedLogConfiguration.class); + + private static ClassLoader defaultLoader; + + static { + defaultLoader = Thread.currentThread().getContextClassLoader(); + if (null == defaultLoader) { + defaultLoader = DistributedLogConfiguration.class.getClassLoader(); + } + } + + // + // ZooKeeper Related Settings + // + + public static final String BKDL_ZK_ACL_ID = "zkAclId"; + public static final String BKDL_ZK_ACL_ID_DEFAULT = null; + public static final String BKDL_ZK_SESSION_TIMEOUT_SECONDS = "zkSessionTimeoutSeconds"; + public static final int BKDL_ZK_SESSION_TIMEOUT_SECONDS_DEFAULT = 30; + public static final String BKDL_ZK_REQUEST_RATE_LIMIT = "zkRequestRateLimit"; + public static final double BKDL_ZK_REQUEST_RATE_LIMIT_DEFAULT = 0; + public static final String BKDL_ZK_NUM_RETRIES = "zkNumRetries"; + public static final int BKDL_ZK_NUM_RETRIES_DEFAULT = 3; + public static final String BKDL_ZK_RETRY_BACKOFF_START_MILLIS = "zkRetryStartBackoffMillis"; + public static final int BKDL_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT = 5000; + public static final String BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS = "zkRetryMaxBackoffMillis"; + public static final int BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT = 30000; + public static final String BKDL_ZKCLIENT_NUM_RETRY_THREADS = "zkcNumRetryThreads"; + public static final int BKDL_ZKCLIENT_NUM_RETRY_THREADS_DEFAULT = 1; + + // + // BookKeeper Related Settings + // + + // BookKeeper zookeeper settings + public static final String BKDL_BKCLIENT_ZK_SESSION_TIMEOUT = "bkcZKSessionTimeoutSeconds"; + public static final int BKDL_BKCLIENT_ZK_SESSION_TIMEOUT_DEFAULT = 30; + public static final String BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT = "bkcZKRequestRateLimit"; + public static final double BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT_DEFAULT = 0; + public static final String BKDL_BKCLIENT_ZK_NUM_RETRIES = "bkcZKNumRetries"; + public static final int BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT = 3; + public static final String BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS = "bkcZKRetryStartBackoffMillis"; + public static final int BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT = 5000; + public static final String BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS = "bkcZKRetryMaxBackoffMillis"; + public static final int BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT = 30000; + + // Bookkeeper ensemble placement settings + // Bookkeeper ensemble size + public static final String BKDL_BOOKKEEPER_ENSEMBLE_SIZE = "bkcEnsembleSize"; + // @Deprecated + public static final String BKDL_BOOKKEEPER_ENSEMBLE_SIZE_OLD = "ensemble-size"; + public static final int BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3; + // Bookkeeper write quorum size + public static final String BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE = "bkcWriteQuorumSize"; + // @Deprecated + public static final String BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_OLD = "write-quorum-size"; + public static final int BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_DEFAULT = 3; + // Bookkeeper ack quorum size + public static final String BKDL_BOOKKEEPER_ACK_QUORUM_SIZE = "bkcAckQuorumSize"; + // @Deprecated + public static final String BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_OLD = "ack-quorum-size"; + public static final int BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_DEFAULT = 2; + public static final String BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT = "bkRowAwareEnsemblePlacement"; + public static final String BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_OLD = "row-aware-ensemble-placement"; + public static final boolean BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_DEFAULT = false; + public static final String BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS = "bkEnsemblePlacementDnsResolverClass"; + public static final String BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS_DEFAULT = + DNSResolverForRacks.class.getName(); + public static final String BKDL_BK_DNS_RESOLVER_OVERRIDES = "dnsResolverOverrides"; + public static final String BKDL_BK_DNS_RESOLVER_OVERRIDES_DEFAULT = ""; + + // General Settings + // @Deprecated + public static final String BKDL_BOOKKEEPER_DIGEST_PW = "digestPw"; + public static final String BKDL_BOOKKEEPER_DIGEST_PW_DEFAULT = ""; + public static final String BKDL_BKCLIENT_NUM_IO_THREADS = "bkcNumIOThreads"; + public static final String BKDL_TIMEOUT_TIMER_TICK_DURATION_MS = "timerTickDuration"; + public static final long BKDL_TIMEOUT_TIMER_TICK_DURATION_MS_DEFAULT = 100; + public static final String BKDL_TIMEOUT_TIMER_NUM_TICKS = "timerNumTicks"; + public static final int BKDL_TIMEOUT_TIMER_NUM_TICKS_DEFAULT = 1024; + + // + // Deprecated BookKeeper Settings (in favor of "bkc." style bookkeeper settings) + // + + public static final String BKDL_BKCLIENT_READ_TIMEOUT = "bkcReadTimeoutSeconds"; + public static final int BKDL_BKCLIENT_READ_TIMEOUT_DEFAULT = 10; + public static final String BKDL_BKCLIENT_WRITE_TIMEOUT = "bkcWriteTimeoutSeconds"; + public static final int BKDL_BKCLIENT_WRITE_TIMEOUT_DEFAULT = 10; + public static final String BKDL_BKCLIENT_NUM_WORKER_THREADS = "bkcNumWorkerThreads"; + public static final int BKDL_BKCLEINT_NUM_WORKER_THREADS_DEFAULT = 1; + + // + // DL General Settings + // + + // Executor Parameters + public static final String BKDL_NUM_WORKER_THREADS = "numWorkerThreads"; + public static final String BKDL_NUM_READAHEAD_WORKER_THREADS = "numReadAheadWorkerThreads"; + public static final String BKDL_NUM_LOCKSTATE_THREADS = "numLockStateThreads"; + public static final String BKDL_NUM_RESOURCE_RELEASE_THREADS = "numResourceReleaseThreads"; + public static final String BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS = "schedulerShutdownTimeoutMs"; + public static final int BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS_DEFAULT = 5000; + public static final String BKDL_USE_DAEMON_THREAD = "useDaemonThread"; + public static final boolean BKDL_USE_DAEMON_THREAD_DEFAULT = false; + + // Metadata Parameters + public static final String BKDL_LEDGER_METADATA_LAYOUT_VERSION = "ledgerMetadataLayoutVersion"; + public static final String BKDL_LEDGER_METADATA_LAYOUT_VERSION_OLD = "ledger-metadata-layout"; + public static final int BKDL_LEDGER_METADATA_LAYOUT_VERSION_DEFAULT = + LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value; + public static final String BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK = "ledgerMetadataSkipMinVersionCheck"; + public static final boolean BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK_DEFAULT = false; + public static final String BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER = "firstLogsegmentSequenceNumber"; + public static final String BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_OLD = "first-logsegment-sequence-number"; + public static final long BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_DEFAULT = + DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO; + public static final String BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED = "logSegmentSequenceNumberValidationEnabled"; + public static final boolean BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED_DEFAULT = true; + public static final String BKDL_ENABLE_RECORD_COUNTS = "enableRecordCounts"; + public static final boolean BKDL_ENABLE_RECORD_COUNTS_DEFAULT = true; + public static final String BKDL_MAXID_SANITYCHECK = "maxIdSanityCheck"; + public static final boolean BKDL_MAXID_SANITYCHECK_DEFAULT = true; + public static final String BKDL_ENCODE_REGION_ID_IN_VERSION = "encodeRegionIDInVersion"; + public static final boolean BKDL_ENCODE_REGION_ID_IN_VERSION_DEFAULT = false; + // (@Deprecated) + public static final String BKDL_LOGSEGMENT_NAME_VERSION = "logSegmentNameVersion"; + public static final int BKDL_LOGSEGMENT_NAME_VERSION_DEFAULT = DistributedLogConstants.LOGSEGMENT_NAME_VERSION; + // (@Derepcated) Name for the default (non-partitioned) stream + public static final String BKDL_UNPARTITIONED_STREAM_NAME = "unpartitionedStreamName"; + public static final String BKDL_UNPARTITIONED_STREAM_NAME_DEFAULT = "<default>"; + + // Log Segment Cache Parameters + public static final String BKDL_LOGSEGMENT_CACHE_TTL_MS = "logSegmentCacheTTLMs"; + public static final long BKDL_LOGSEGMENT_CACHE_TTL_MS_DEFAULT = 600000; // 10 mins + public static final String BKDL_LOGSEGMENT_CACHE_MAX_SIZE = "logSegmentCacheMaxSize"; + public static final long BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT = 10000; + public static final String BKDL_LOGSEGMENT_CACHE_ENABLED = "logSegmentCacheEnabled"; + public static final boolean BKDL_LOGSEGMENT_CACHE_ENABLED_DEFAULT = true; + + // + // DL Writer Settings + // + + // General Settings + public static final String BKDL_CREATE_STREAM_IF_NOT_EXISTS = "createStreamIfNotExists"; + public static final boolean BKDL_CREATE_STREAM_IF_NOT_EXISTS_DEFAULT = true; + public static final String BKDL_LOG_FLUSH_TIMEOUT = "logFlushTimeoutSeconds"; + public static final int BKDL_LOG_FLUSH_TIMEOUT_DEFAULT = 30; + /** + * CompressionCodec.Type String to use (See CompressionUtils) + * --------------------- ------------------------------------ + * NONE none + * LZ4 lz4 + * UNKNOWN any other instance of String.class + */ + public static final String BKDL_COMPRESSION_TYPE = "compressionType"; + public static final String BKDL_COMPRESSION_TYPE_DEFAULT = "none"; + public static final String BKDL_FAILFAST_ON_STREAM_NOT_READY = "failFastOnStreamNotReady"; + public static final boolean BKDL_FAILFAST_ON_STREAM_NOT_READY_DEFAULT = false; + public static final String BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR = "disableRollingOnLogSegmentError"; + public static final boolean BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR_DEFAULT = false; + + // Durability Settings + public static final String BKDL_IS_DURABLE_WRITE_ENABLED = "isDurableWriteEnabled"; + public static final boolean BKDL_IS_DURABLE_WRITE_ENABLED_DEFAULT = true; + + // Transmit Settings + public static final String BKDL_OUTPUT_BUFFER_SIZE = "writerOutputBufferSize"; + public static final String BKDL_OUTPUT_BUFFER_SIZE_OLD = "output-buffer-size"; + public static final int BKDL_OUTPUT_BUFFER_SIZE_DEFAULT = 1024; + public static final String BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS = "periodicFlushFrequencyMilliSeconds"; + public static final int BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS_DEFAULT = 0; + public static final String BKDL_ENABLE_IMMEDIATE_FLUSH = "enableImmediateFlush"; + public static final boolean BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT = false; + public static final String BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS = "minimumDelayBetweenImmediateFlushMilliSeconds"; + public static final int BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT = 0; + public static final String BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS = "periodicKeepAliveMilliSeconds"; + public static final int BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT = 0; + + // Retention/Truncation Settings + public static final String BKDL_RETENTION_PERIOD_IN_HOURS = "logSegmentRetentionHours"; + public static final String BKDL_RETENTION_PERIOD_IN_HOURS_OLD = "retention-size"; + public static final int BKDL_RETENTION_PERIOD_IN_HOURS_DEFAULT = 72; + public static final String BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION = "explicitTruncationByApp"; + public static final boolean BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION_DEFAULT = false; + + // Log Segment Rolling Settings + public static final String BKDL_ROLLING_INTERVAL_IN_MINUTES = "logSegmentRollingMinutes"; + public static final String BKDL_ROLLING_INTERVAL_IN_MINUTES_OLD = "rolling-interval"; + public static final int BKDL_ROLLING_INTERVAL_IN_MINUTES_DEFAULT = 120; + public static final String BKDL_MAX_LOGSEGMENT_BYTES = "maxLogSegmentBytes"; + public static final int BKDL_MAX_LOGSEGMENT_BYTES_DEFAULT = 256 * 1024 * 1024; // default 256MB + public static final String BKDL_LOGSEGMENT_ROLLING_CONCURRENCY = "logSegmentRollingConcurrency"; + public static final int BKDL_LOGSEGMENT_ROLLING_CONCURRENCY_DEFAULT = 1; + + // Lock Settings + public static final String BKDL_WRITE_LOCK_ENABLED = "writeLockEnabled"; + public static final boolean BKDL_WRITE_LOCK_ENABLED_DEFAULT = true; + public static final String BKDL_LOCK_TIMEOUT = "lockTimeoutSeconds"; + public static final long BKDL_LOCK_TIMEOUT_DEFAULT = 30; + public static final String BKDL_LOCK_REACQUIRE_TIMEOUT = "lockReacquireTimeoutSeconds"; + public static final long BKDL_LOCK_REACQUIRE_TIMEOUT_DEFAULT = DistributedLogConstants.LOCK_REACQUIRE_TIMEOUT_DEFAULT; + public static final String BKDL_LOCK_OP_TIMEOUT = "lockOpTimeoutSeconds"; + public static final long BKDL_LOCK_OP_TIMEOUT_DEFAULT = DistributedLogConstants.LOCK_OP_TIMEOUT_DEFAULT; + + // Ledger Allocator Settings + public static final String BKDL_ENABLE_LEDGER_ALLOCATOR_POOL = "enableLedgerAllocatorPool"; + public static final boolean BKDL_ENABLE_LEDGER_ALLOCATOR_POOL_DEFAULT = false; + public static final String BKDL_LEDGER_ALLOCATOR_POOL_PATH = "ledgerAllocatorPoolPath"; + public static final String BKDL_LEDGER_ALLOCATOR_POOL_PATH_DEFAULT = DistributedLogConstants.ALLOCATION_POOL_NODE; + public static final String BKDL_LEDGER_ALLOCATOR_POOL_NAME = "ledgerAllocatorPoolName"; + public static final String BKDL_LEDGER_ALLOCATOR_POOL_NAME_DEFAULT = null; + public static final String BKDL_LEDGER_ALLOCATOR_POOL_CORE_SIZE = "ledgerAllocatorPoolCoreSize"; + public static final int BKDL_LEDGER_ALLOCATOR_POOL_CORE_SIZE_DEFAULT = 20; + + // Write Limit Settings + public static final String BKDL_PER_WRITER_OUTSTANDING_WRITE_LIMIT = "perWriterOutstandingWriteLimit"; + public static final int BKDL_PER_WRITER_OUTSTANDING_WRITE_LIMIT_DEFAULT = -1; + public static final String BKDL_GLOBAL_OUTSTANDING_WRITE_LIMIT = "globalOutstandingWriteLimit"; + public static final int BKDL_GLOBAL_OUTSTANDING_WRITE_LIMIT_DEFAULT = -1; + public static final String BKDL_OUTSTANDING_WRITE_LIMIT_DARKMODE = "outstandingWriteLimitDarkmode"; + public static final boolean BKDL_OUTSTANDING_WRITE_LIMIT_DARKMODE_DEFAULT = true; + + // + // DL Reader Settings + // + + // General Settings + public static final String BKDL_READLAC_OPTION = "readLACLongPoll"; + public static final int BKDL_READLAC_OPTION_DEFAULT = 3; //BKLogPartitionReadHandler.ReadLACOption.READENTRYPIGGYBACK_SEQUENTIAL.value + public static final String BKDL_READLACLONGPOLL_TIMEOUT = "readLACLongPollTimeout"; + public static final int BKDL_READLACLONGPOLL_TIMEOUT_DEFAULT = 1000; + public static final String BKDL_DESERIALIZE_RECORDSET_ON_READS = "deserializeRecordSetOnReads"; + public static final boolean BKDL_DESERIALIZE_RECORDSET_ON_READS_DEFAULT = true; + + // Idle reader settings + public static final String BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS = "readerIdleWarnThresholdMillis"; + public static final int BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS_DEFAULT = 120000; + public static final String BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS = "readerIdleErrorThresholdMillis"; + public static final int BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS_DEFAULT = Integer.MAX_VALUE; + + // Reader constraint settings + public static final String BKDL_READER_IGNORE_TRUNCATION_STATUS = "ignoreTruncationStatus"; + public static final boolean BKDL_READER_IGNORE_TRUNCATION_STATUS_DEFAULT = false; + public static final String BKDL_READER_ALERT_POSITION_ON_TRUNCATED = "alertPositionOnTruncated"; + public static final boolean BKDL_READER_ALERT_POSITION_ON_TRUNCATED_DEFAULT = true; + public static final String BKDL_READER_POSITION_GAP_DETECTION_ENABLED = "positionGapDetectionEnabled"; + public static final boolean BKDL_READER_POSITION_GAP_DETECTION_ENABLED_DEFAULT = false; + + // Read ahead related parameters + public static final String BKDL_ENABLE_READAHEAD = "enableReadAhead"; + public static final boolean BKDL_ENABLE_READAHEAD_DEFAULT = true; + public static final String BKDL_ENABLE_FORCEREAD = "enableForceRead"; + public static final boolean BKDL_ENABLE_FORCEREAD_DEFAULT = true; + public static final String BKDL_READAHEAD_MAX_RECORDS = "readAheadMaxRecords"; + public static final String BKDL_READAHEAD_MAX_RECORDS_OLD = "ReadAheadMaxEntries"; + public static final int BKDL_READAHEAD_MAX_RECORDS_DEFAULT = 10; + public static final String BKDL_READAHEAD_BATCHSIZE = "readAheadBatchSize"; + public static final String BKDL_READAHEAD_BATCHSIZE_OLD = "ReadAheadBatchSize"; + public static final int BKDL_READAHEAD_BATCHSIZE_DEFAULT = 2; + public static final String BKDL_READAHEAD_WAITTIME = "readAheadWaitTime"; + public static final String BKDL_READAHEAD_WAITTIME_OLD = "ReadAheadWaitTime"; + public static final int BKDL_READAHEAD_WAITTIME_DEFAULT = 200; + public static final String BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM = "readAheadWaitTimeOnEndOfStream"; + public static final String BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM_OLD = "ReadAheadWaitTimeOnEndOfStream"; + public static final int BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM_DEFAULT = 10000; + public static final String BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS = + "readAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis"; + public static final int BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS_DEFAULT = 10000; + public static final String BKDL_READAHEAD_SKIP_BROKEN_ENTRIES = "readAheadSkipBrokenEntries"; + public static final boolean BKDL_READAHEAD_SKIP_BROKEN_ENTRIES_DEFAULT = false; + public static final String BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT = "numPrefetchEntriesPerLogSegment"; + public static final int BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT = 4; + public static final String BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT = "maxPrefetchEntriesPerLogSegment"; + public static final int BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT = 32; + + // Scan Settings + public static final String BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN = "firstNumEntriesEachPerLastRecordScan"; + public static final int BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN_DEFAULT = 2; + public static final String BKDL_MAX_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN = "maxNumEntriesPerReadLastRecordScan"; + public static final int BKDL_MAX_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN_DEFAULT = 16; + + // Log Existence Settings + public static final String BKDL_CHECK_LOG_EXISTENCE_BACKOFF_START_MS = "checkLogExistenceBackoffStartMillis"; + public static final int BKDL_CHECK_LOG_EXISTENCE_BACKOFF_START_MS_DEFAULT = 200; + public static final String BKDL_CHECK_LOG_EXISTENCE_BACKOFF_MAX_MS = "checkLogExistenceBackoffMaxMillis"; + public static final int BKDL_CHECK_LOG_EXISTENCE_BACKOFF_MAX_MS_DEFAULT = 1000; + + // + // Tracing/Stats Settings + // + + public static final String BKDL_TRACE_READAHEAD_DELIVERY_LATENCY = "traceReadAheadDeliveryLatency"; + public static final boolean BKDL_TRACE_READAHEAD_DELIVERY_LATENCY_DEFAULT = false; + public static final String BKDL_METADATA_LATENCY_WARN_THRESHOLD_MS = "metadataLatencyWarnThresholdMs"; + public static final long BKDL_METADATA_LATENCY_WARN_THRESHOLD_MS_DEFAULT = DistributedLogConstants.LATENCY_WARN_THRESHOLD_IN_MILLIS; + public static final String BKDL_DATA_LATENCY_WARN_THRESHOLD_MS = "dataLatencyWarnThresholdMs"; + public static final long BKDL_DATA_LATENCY_WARN_THRESHOLD_MS_DEFAULT = 2 * DistributedLogConstants.LATENCY_WARN_THRESHOLD_IN_MILLIS; + public static final String BKDL_TRACE_READAHEAD_METADATA_CHANGES = "traceReadAheadMetadataChanges"; + public static final boolean BKDL_TRACE_READAHEAD_MEATDATA_CHANGES_DEFAULT = false; + public final static String BKDL_ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats"; + public final static boolean BKDL_ENABLE_TASK_EXECUTION_STATS_DEFAULT = false; + public final static String BKDL_TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros"; + public final static long BKDL_TASK_EXECUTION_WARN_TIME_MICROS_DEFAULT = 100000; + public static final String BKDL_ENABLE_PERSTREAM_STAT = "enablePerStreamStat"; + public static final boolean BKDL_ENABLE_PERSTREAM_STAT_DEFAULT = false; + + // + // Settings for Feature Providers + // + + public static final String BKDL_FEATURE_PROVIDER_CLASS = "featureProviderClass"; + + // + // Settings for Configuration Based Feature Provider + // + + public static final String BKDL_FILE_FEATURE_PROVIDER_BASE_CONFIG_PATH = "fileFeatureProviderBaseConfigPath"; + public static final String BKDL_FILE_FEATURE_PROVIDER_BASE_CONFIG_PATH_DEFAULT = "decider.conf"; + public static final String BKDL_FILE_FEATURE_PROVIDER_OVERLAY_CONFIG_PATH = "fileFeatureProviderOverlayConfigPath"; + public static final String BKDL_FILE_FEATURE_PROVIDER_OVERLAY_CONFIG_PATH_DEFAULT = null; + + // + // Settings for Namespaces + // + + public static final String BKDL_FEDERATED_NAMESPACE_ENABLED = "federatedNamespaceEnabled"; + public static final boolean BKDL_FEDERATED_NAMESPACE_ENABLED_DEFAULT = false; + public static final String BKDL_FEDERATED_MAX_LOGS_PER_SUBNAMESPACE = "federatedMaxLogsPerSubnamespace"; + public static final int BKDL_FEDERATED_MAX_LOGS_PER_SUBNAMESPACE_DEFAULT = 15000; + public static final String BKDL_FEDERATED_CHECK_EXISTENCE_WHEN_CACHE_MISS = "federatedCheckExistenceWhenCacheMiss"; + public static final boolean BKDL_FEDERATED_CHECK_EXISTENCE_WHEN_CACHE_MISS_DEFAULT = true; + + // Settings for Configurations + + public static final String BKDL_DYNAMIC_CONFIG_RELOAD_INTERVAL_SEC = "dynamicConfigReloadIntervalSec"; + public static final int BKDL_DYNAMIC_CONFIG_RELOAD_INTERVAL_SEC_DEFAULT = 60; + public static final String BKDL_STREAM_CONFIG_ROUTER_CLASS = "streamConfigRouterClass"; + public static final String BKDL_STREAM_CONFIG_ROUTER_CLASS_DEFAULT = "org.apache.distributedlog.service.config.IdentityConfigRouter"; + + // Settings for RateLimit (used by distributedlog-service) + + public static final String BKDL_BPS_SOFT_WRITE_LIMIT = "bpsSoftWriteLimit"; + public static final int BKDL_BPS_SOFT_WRITE_LIMIT_DEFAULT = -1; + public static final String BKDL_BPS_HARD_WRITE_LIMIT = "bpsHardWriteLimit"; + public static final int BKDL_BPS_HARD_WRITE_LIMIT_DEFAULT = -1; + public static final String BKDL_RPS_SOFT_WRITE_LIMIT = "rpsSoftWriteLimit"; + public static final int BKDL_RPS_SOFT_WRITE_LIMIT_DEFAULT = -1; + public static final String BKDL_RPS_HARD_WRITE_LIMIT = "rpsHardWriteLimit"; + public static final int BKDL_RPS_HARD_WRITE_LIMIT_DEFAULT = -1; + + // Rate and resource limits: per shard + + public static final String BKDL_RPS_SOFT_SERVICE_LIMIT = "rpsSoftServiceLimit"; + public static final int BKDL_RPS_SOFT_SERVICE_LIMIT_DEFAULT = -1; + public static final String BKDL_RPS_HARD_SERVICE_LIMIT = "rpsHardServiceLimit"; + public static final int BKDL_RPS_HARD_SERVICE_LIMIT_DEFAULT = -1; + public static final String BKDL_RPS_STREAM_ACQUIRE_SERVICE_LIMIT = "rpsStreamAcquireServiceLimit"; + public static final int BKDL_RPS_STREAM_ACQUIRE_SERVICE_LIMIT_DEFAULT = -1; + public static final String BKDL_BPS_SOFT_SERVICE_LIMIT = "bpsSoftServiceLimit"; + public static final int BKDL_BPS_SOFT_SERVICE_LIMIT_DEFAULT = -1; + public static final String BKDL_BPS_HARD_SERVICE_LIMIT = "bpsHardServiceLimit"; + public static final int BKDL_BPS_HARD_SERVICE_LIMIT_DEFAULT = -1; + public static final String BKDL_BPS_STREAM_ACQUIRE_SERVICE_LIMIT = "bpsStreamAcquireServiceLimit"; + public static final int BKDL_BPS_STREAM_ACQUIRE_SERVICE_LIMIT_DEFAULT = -1; + + // Settings for Partitioning + + public static final String BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY = "maxAcquiredPartitionsPerProxy"; + public static final int BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY_DEFAULT = -1; + + public static final String BKDL_MAX_CACHED_PARTITIONS_PER_PROXY = "maxCachedPartitionsPerProxy"; + public static final int BKDL_MAX_CACHED_PARTITIONS_PER_PROXY_DEFAULT = -1; + + // + // Settings for Error Injection + // + public static final String BKDL_EI_INJECT_WRITE_DELAY = "eiInjectWriteDelay"; + public static final boolean BKDL_EI_INJECT_WRITE_DELAY_DEFAULT = false; + public static final String BKDL_EI_INJECTED_WRITE_DELAY_PERCENT = "eiInjectedWriteDelayPercent"; + public static final double BKDL_EI_INJECTED_WRITE_DELAY_PERCENT_DEFAULT = 0.0; + public static final String BKDL_EI_INJECTED_WRITE_DELAY_MS = "eiInjectedWriteDelayMs"; + public static final int BKDL_EI_INJECTED_WRITE_DELAY_MS_DEFAULT = 0; + public static final String BKDL_EI_INJECT_READAHEAD_STALL = "eiInjectReadAheadStall"; + public static final boolean BKDL_EI_INJECT_READAHEAD_STALL_DEFAULT = false; + public static final String BKDL_EI_INJECT_READAHEAD_DELAY = "eiInjectReadAheadDelay"; + public static final boolean BKDL_EI_INJECT_READAHEAD_DELAY_DEFAULT = false; + public static final String BKDL_EI_INJECT_MAX_READAHEAD_DELAY_MS = "eiInjectMaxReadAheadDelayMs"; + public static final int BKDL_EI_INJECT_MAX_READAHEAD_DELAY_MS_DEFAULT = 0; + public static final String BKDL_EI_INJECT_READAHEAD_DELAY_PERCENT = "eiInjectReadAheadDelayPercent"; + public static final int BKDL_EI_INJECT_READAHEAD_DELAY_PERCENT_DEFAULT = 10; + public static final String BKDL_EI_INJECT_READAHEAD_BROKEN_ENTRIES = "eiInjectReadAheadBrokenEntries"; + public static final boolean BKDL_EI_INJECT_READAHEAD_BROKEN_ENTRIES_DEFAULT = false; + + // Whitelisted stream-level configuration settings. + private static final Set<String> streamSettings = Sets.newHashSet( + BKDL_READER_POSITION_GAP_DETECTION_ENABLED, + BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS, + BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS, + BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS, + BKDL_ENABLE_IMMEDIATE_FLUSH + ); + + /** + * Construct distributedlog configuration with default settings. + * It also loads the settings from system properties. + */ + public DistributedLogConfiguration() { + super(); + // add configuration for system properties + addConfiguration(new SystemConfiguration()); + } + + /** + * You can load configurations in precedence order. The first one takes + * precedence over any loaded later. + * + * @param confURL Configuration URL + */ + public void loadConf(URL confURL) throws ConfigurationException { + Configuration loadedConf = new PropertiesConfiguration(confURL); + addConfiguration(loadedConf); + } + + /** + * You can load configuration from other configuration + * + * @param baseConf Other Configuration + */ + public void loadConf(DistributedLogConfiguration baseConf) { + addConfiguration(baseConf); + } + + /** + * Load configuration from other configuration object + * + * @param otherConf Other configuration object + */ + public void loadConf(Configuration otherConf) { + addConfiguration(otherConf); + } + + /** + * Load whitelisted stream configuration from another configuration object + * + * @param streamConfiguration stream configuration overrides + */ + public void loadStreamConf(Optional<DistributedLogConfiguration> streamConfiguration) { + if (!streamConfiguration.isPresent()) { + return; + } + ArrayList<Object> ignoredSettings = new ArrayList<Object>(); + Iterator iterator = streamConfiguration.get().getKeys(); + while (iterator.hasNext()) { + Object setting = iterator.next(); + if (setting instanceof String && streamSettings.contains(setting)) { + String settingStr = (String) setting; + setProperty(settingStr, streamConfiguration.get().getProperty(settingStr)); + } else { + ignoredSettings.add(setting); + } + } + if (LOG.isWarnEnabled() && !ignoredSettings.isEmpty()) { + LOG.warn("invalid stream configuration override(s): {}", + StringUtils.join(ignoredSettings, ";")); + } + } + + // + // ZooKeeper Related Settings + // + + /** + * Get all properties as a string. + */ + public String getPropsAsString() { + Iterator iterator = getKeys(); + StringBuilder builder = new StringBuilder(); + boolean appendNewline = false; + while (iterator.hasNext()) { + Object key = iterator.next(); + if (key instanceof String) { + if (appendNewline) { + builder.append("\n"); + } + Object value = getProperty((String)key); + builder.append(key).append("=").append(value); + appendNewline = true; + } + } + return builder.toString(); + } + + /** + * Get digest id used for ZK acl. + * + * @return zk acl id. + */ + public String getZkAclId() { + return getString(BKDL_ZK_ACL_ID, BKDL_ZK_ACL_ID_DEFAULT); + } + + /** + * Set digest id to use for ZK acl. + * + * @param zkAclId acl id. + * @return distributedlog configuration + * @see #getZkAclId() + */ + public DistributedLogConfiguration setZkAclId(String zkAclId) { + setProperty(BKDL_ZK_ACL_ID, zkAclId); + return this; + } + + /** + * Get ZK Session timeout in seconds. + * <p> + * This is the session timeout applied for zookeeper client used by distributedlog. + * Use {@link #getBKClientZKSessionTimeoutMilliSeconds()} for zookeeper client used + * by bookkeeper client. + * + * @return zookeeeper session timeout in seconds. + * @deprecated use {@link #getZKSessionTimeoutMilliseconds()} + */ + public int getZKSessionTimeoutSeconds() { + return this.getInt(BKDL_ZK_SESSION_TIMEOUT_SECONDS, BKDL_ZK_SESSION_TIMEOUT_SECONDS_DEFAULT); + } + + /** + * Get ZK Session timeout in milliseconds. + * <p> + * This is the session timeout applied for zookeeper client used by distributedlog. + * Use {@link #getBKClientZKSessionTimeoutMilliSeconds()} for zookeeper client used + * by bookkeeper client. + * + * @return zk session timeout in milliseconds. + */ + public int getZKSessionTimeoutMilliseconds() { + return this.getInt(BKDL_ZK_SESSION_TIMEOUT_SECONDS, BKDL_ZK_SESSION_TIMEOUT_SECONDS_DEFAULT) * 1000; + } + + /** + * Set ZK Session Timeout in seconds. + * + * @param zkSessionTimeoutSeconds session timeout in seconds. + * @return distributed log configuration + * @see #getZKSessionTimeoutMilliseconds() + */ + public DistributedLogConfiguration setZKSessionTimeoutSeconds(int zkSessionTimeoutSeconds) { + setProperty(BKDL_ZK_SESSION_TIMEOUT_SECONDS, zkSessionTimeoutSeconds); + return this; + } + + /** + * Get zookeeper access rate limit. + * <p>The rate limiter is basically a guava {@link com.google.common.util.concurrent.RateLimiter}. + * It is rate limiting the requests that sent by zookeeper client. If the value is non-positive, + * the rate limiting is disable. By default it is disable (value = 0). + * + * @return zookeeper access rate, by default it is 0. + */ + public double getZKRequestRateLimit() { + return this.getDouble(BKDL_ZK_REQUEST_RATE_LIMIT, BKDL_ZK_REQUEST_RATE_LIMIT_DEFAULT); + } + + /** + * Set zookeeper access rate limit (rps). + * + * @param requestRateLimit + * zookeeper access rate limit + * @return distributedlog configuration + * @see #getZKRequestRateLimit() + */ + public DistributedLogConfiguration setZKRequestRateLimit(double requestRateLimit) { + setProperty(BKDL_ZK_REQUEST_RATE_LIMIT, requestRateLimit); + return this; + } + + /** + * Get num of retries per request for zookeeper client. + * <p>Retries only happen on retryable failures like session expired, + * session moved. for permanent failures, the request will fail immediately. + * The default value is 3. + * + * @return num of retries per request of zookeeper client. + */ + public int getZKNumRetries() { + return this.getInt(BKDL_ZK_NUM_RETRIES, BKDL_ZK_NUM_RETRIES_DEFAULT); + } + + /** + * Set num of retries per request for zookeeper client. + * + * @param zkNumRetries num of retries per request of zookeeper client. + * @return distributed log configuration + * @see #getZKNumRetries() + */ + public DistributedLogConfiguration setZKNumRetries(int zkNumRetries) { + setProperty(BKDL_ZK_NUM_RETRIES, zkNumRetries); + return this; + } + + /** + * Get the start backoff time of zookeeper operation retries, in milliseconds. + * <p>The retry time will increase in bound exponential way, and become flat + * after hit max backoff time ({@link #getZKRetryBackoffMaxMillis()}). + * The default start backoff time is 5000 milliseconds. + * + * @return start backoff time of zookeeper operation retries, in milliseconds. + * @see #getZKRetryBackoffMaxMillis() + */ + public int getZKRetryBackoffStartMillis() { + return this.getInt(BKDL_ZK_RETRY_BACKOFF_START_MILLIS, + BKDL_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT); + } + + /** + * Set the start backoff time of zookeeper operation retries, in milliseconds. + * + * @param zkRetryBackoffStartMillis start backoff time of zookeeper operation retries, + * in milliseconds. + * @return distributed log configuration + * @see #getZKRetryBackoffStartMillis() + */ + public DistributedLogConfiguration setZKRetryBackoffStartMillis(int zkRetryBackoffStartMillis) { + setProperty(BKDL_ZK_RETRY_BACKOFF_START_MILLIS, zkRetryBackoffStartMillis); + return this; + } + + /** + * Get the max backoff time of zookeeper operation retries, in milliseconds. + * <p>The retry time will increase in bound exponential way starting from + * {@link #getZKRetryBackoffStartMillis()}, and become flat after hit this max + * backoff time. + * The default max backoff time is 30000 milliseconds. + * + * @return max backoff time of zookeeper operation retries, in milliseconds. + * @see #getZKRetryBackoffStartMillis() + */ + public int getZKRetryBackoffMaxMillis() { + return this.getInt(BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS, + BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT); + } + + /** + * Set the max backoff time of zookeeper operation retries, in milliseconds. + * + * @param zkRetryBackoffMaxMillis max backoff time of zookeeper operation retries, + * in milliseconds. + * @return distributed log configuration + * @see #getZKRetryBackoffMaxMillis() + */ + public DistributedLogConfiguration setZKRetryBackoffMaxMillis(int zkRetryBackoffMaxMillis) { + setProperty(BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS, zkRetryBackoffMaxMillis); + return this; + } + + /** + * Get ZK client number of retry executor threads. + * By default it is 1. + * + * @return number of bookkeeper client worker threads. + */ + public int getZKClientNumberRetryThreads() { + return this.getInt(BKDL_ZKCLIENT_NUM_RETRY_THREADS, BKDL_ZKCLIENT_NUM_RETRY_THREADS_DEFAULT); + } + + /** + * Set ZK client number of retry executor threads. + * + * @param numThreads + * number of retry executor threads. + * @return distributedlog configuration. + * @see #getZKClientNumberRetryThreads() + */ + public DistributedLogConfiguration setZKClientNumberRetryThreads(int numThreads) { + setProperty(BKDL_ZKCLIENT_NUM_RETRY_THREADS, numThreads); + return this; + } + + // + // BookKeeper ZooKeeper Client Settings + // + + /** + * Get BK's zookeeper session timout in milliseconds. + * <p> + * This is the session timeout applied for zookeeper client used by bookkeeper client. + * Use {@link #getZKSessionTimeoutMilliseconds()} for zookeeper client used + * by distributedlog. + * + * @return Bk's zookeeper session timeout in milliseconds + */ + public int getBKClientZKSessionTimeoutMilliSeconds() { + return this.getInt(BKDL_BKCLIENT_ZK_SESSION_TIMEOUT, BKDL_BKCLIENT_ZK_SESSION_TIMEOUT_DEFAULT) * 1000; + } + + /** + * Set BK's zookeeper session timeout in seconds. + * + * @param sessionTimeout session timeout for the ZK Client used by BK Client, in seconds. + * @return distributed log configuration + * @see #getBKClientZKSessionTimeoutMilliSeconds() + */ + public DistributedLogConfiguration setBKClientZKSessionTimeout(int sessionTimeout) { + setProperty(BKDL_BKCLIENT_ZK_SESSION_TIMEOUT, sessionTimeout); + return this; + } + + /** + * Get zookeeper access rate limit for zookeeper client used in bookkeeper client. + * <p>The rate limiter is basically a guava {@link com.google.common.util.concurrent.RateLimiter}. + * It is rate limiting the requests that sent by zookeeper client. If the value is non-positive, + * the rate limiting is disable. By default it is disable (value = 0). + * + * @return zookeeper access rate limit for zookeeper client used in bookkeeper client. + * By default it is 0. + */ + public double getBKClientZKRequestRateLimit() { + return this.getDouble(BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT, + BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT_DEFAULT); + } + + /** + * Set zookeeper access rate limit for zookeeper client used in bookkeeper client. + * + * @param rateLimit + * zookeeper access rate limit + * @return distributedlog configuration. + * @see #getBKClientZKRequestRateLimit() + */ + public DistributedLogConfiguration setBKClientZKRequestRateLimit(double rateLimit) { + setProperty(BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT, rateLimit); + return this; + } + + /** + * Get num of retries for zookeeper client that used by bookkeeper client. + * <p>Retries only happen on retryable failures like session expired, + * session moved. for permanent failures, the request will fail immediately. + * The default value is 3. Setting it to zero or negative will retry infinitely. + * + * @return num of retries of zookeeper client used by bookkeeper client. + */ + public int getBKClientZKNumRetries() { + int zkNumRetries = this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT); + if (zkNumRetries <= 0) { + return Integer.MAX_VALUE; + } + return zkNumRetries; + } + + /** + * Get the start backoff time of zookeeper operation retries, in milliseconds. + * <p>The retry time will increase in bound exponential way, and become flat + * after hit max backoff time ({@link #getBKClientZKRetryBackoffMaxMillis()}. + * The default start backoff time is 5000 milliseconds. + * + * @return start backoff time of zookeeper operation retries, in milliseconds. + * @see #getBKClientZKRetryBackoffMaxMillis() + */ + public int getBKClientZKRetryBackoffStartMillis() { + return this.getInt(BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS, + BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT); + } + + /** + * Get the max backoff time of zookeeper operation retries, in milliseconds. + * <p>The retry time will increase in bound exponential way starting from + * {@link #getBKClientZKRetryBackoffStartMillis()}, and become flat after + * hit this max backoff time. + * The default max backoff time is 30000 milliseconds. + * + * @return max backoff time of zookeeper operation retries, in milliseconds. + * @see #getBKClientZKRetryBackoffStartMillis() + */ + public int getBKClientZKRetryBackoffMaxMillis() { + return this.getInt(BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS, + BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT); + } + + // + // BookKeeper Ensemble Placement Settings + // + + /** + * Get ensemble size of each log segment (ledger) will use. + * By default it is 3. + * <p> + * A log segment's data is stored in an ensemble of bookies in + * a stripping way. Each entry will be added in a <code>write-quorum</code> + * size of bookies. The add operation will complete once it receives + * responses from a <code>ack-quorum</code> size of bookies. The stripping + * is done in a round-robin way in bookkeeper. + * <p> + * For example, we configure the ensemble-size to 5, write-quorum-size to 3, + * and ack-quorum-size to 2. The data will be stored in following stripping way. + * <pre> + * | entry id | bk1 | bk2 | bk3 | bk4 | bk5 | + * | 0 | x | x | x | | | + * | 1 | | x | x | x | | + * | 2 | | | x | x | x | + * | 3 | x | | | x | x | + * | 4 | x | x | | | x | + * | 5 | x | x | x | | | + * </pre> + * <p> + * We don't recommend stripping within a log segment to increase bandwidth. + * We'd recommend to strip by `partition` in higher level of distributedlog + * to increase performance. so typically the ensemble size will set to be + * the same value as write quorum size. + * + * @return ensemble size + * @see #getWriteQuorumSize() + * @see #getAckQuorumSize() + */ + public int getEnsembleSize() { + return this.getInt(BKDL_BOOKKEEPER_ENSEMBLE_SIZE, + getInt(BKDL_BOOKKEEPER_ENSEMBLE_SIZE_OLD, + BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT)); + } + + /** + * Set ensemble size of each log segment (ledger) will use. + * + * @param ensembleSize ensemble size. + * @return distributed log configuration + * @see #getEnsembleSize() + */ + public DistributedLogConfiguration setEnsembleSize(int ensembleSize) { + setProperty(BKDL_BOOKKEEPER_ENSEMBLE_SIZE, ensembleSize); + return this; + } + + /** + * Get write quorum size of each log segment (ledger) will use. + * By default it is 3. + * + * @return write quorum size + * @see #getEnsembleSize() + */ + public int getWriteQuorumSize() { + return this.getInt(BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE, + getInt(BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_OLD, + BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_DEFAULT)); + } + + /** + * Set write quorum size of each log segment (ledger) will use. + * + * @param quorumSize + * quorum size. + * @return distributedlog configuration. + * @see #getWriteQuorumSize() + */ + public DistributedLogConfiguration setWriteQuorumSize(int quorumSize) { + setProperty(BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE, quorumSize); + return this; + } + + /** + * Get ack quorum size of each log segment (ledger) will use. + * By default it is 2. + * + * @return ack quorum size + * @see #getEnsembleSize() + */ + public int getAckQuorumSize() { + return this.getInt(BKDL_BOOKKEEPER_ACK_QUORUM_SIZE, + getInt(BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_OLD, + BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_DEFAULT)); + } + + /** + * Set ack quorum size of each log segment (ledger) will use. + * + * @param quorumSize + * quorum size. + * @return distributedlog configuration. + * @see #getAckQuorumSize() + */ + public DistributedLogConfiguration setAckQuorumSize(int quorumSize) { + setProperty(BKDL_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize); + return this; + } + + /** + * Get the quorum config for each log segment (ledger). + * + * @return quorum config that used by log segments + * @see #getEnsembleSize() + * @see #getWriteQuorumSize() + * @see #getAckQuorumSize() + */ + public QuorumConfig getQuorumConfig() { + return new QuorumConfig( + getEnsembleSize(), + getWriteQuorumSize(), + getAckQuorumSize()); + } + + /** + * Get if row aware ensemble placement is enabled. + * <p>If enabled, {@link DNSResolverForRows} will be used for dns resolution + * rather than {@link DNSResolverForRacks}, if no other dns resolver set via + * {@link #setEnsemblePlacementDnsResolverClass(Class)}. + * By default it is disable. + * + * @return true if row aware ensemble placement is enabled, otherwise false. + * @see #getEnsemblePlacementDnsResolverClass() + */ + public boolean getRowAwareEnsemblePlacementEnabled() { + return getBoolean(BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT, + getBoolean(BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_OLD, + BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_DEFAULT)); + } + + /** + * Set if we should enable row aware ensemble placement. + * + * @param enableRowAwareEnsemblePlacement + * enableRowAwareEnsemblePlacement + * @return distributedlog configuration. + * @see #getRowAwareEnsemblePlacementEnabled() + */ + public DistributedLogConfiguration setRowAwareEnsemblePlacementEnabled(boolean enableRowAwareEnsemblePlacement) { + setProperty(BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT, enableRowAwareEnsemblePlacement); + return this; + } + + /** + * Get the DNS resolver class for bookkeeper ensemble placement. + * <p>By default, {@link DNSResolverForRacks} will be used if + * {@link #getRowAwareEnsemblePlacementEnabled()} is disabled and + * {@link DNSResolverForRows} will be used if {@link #getRowAwareEnsemblePlacementEnabled()} + * is enabled. + * + * @return dns resolver class for bookkeeper ensemble placement. + * @throws ConfigurationException + * @see #getRowAwareEnsemblePlacementEnabled() + */ + public Class<? extends DNSToSwitchMapping> getEnsemblePlacementDnsResolverClass() + throws ConfigurationException { + Class<? extends DNSToSwitchMapping> defaultResolverCls; + if (getRowAwareEnsemblePlacementEnabled()) { + defaultResolverCls = DNSResolverForRows.class; + } else { + defaultResolverCls = DNSResolverForRacks.class; + } + return ReflectionUtils.getClass(this, BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS, + defaultResolverCls, DNSToSwitchMapping.class, defaultLoader); + } + + /** + * Set the DNS resolver class for bookkeeper ensemble placement. + * + * @param dnsResolverClass + * dns resolver class for bookkeeper ensemble placement. + * @return distributedlog configuration + * @see #getEnsemblePlacementDnsResolverClass() + */ + public DistributedLogConfiguration setEnsemblePlacementDnsResolverClass( + Class<? extends DNSToSwitchMapping> dnsResolverClass) { + setProperty(BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS, dnsResolverClass.getName()); + return this; + } + + /** + * Get mapping used to override the region mapping derived by the default resolver. + * <p>It is a string of pairs of host-region mappings (host:region) separated by semicolon. + * By default it is empty string. + * + * @return dns resolver overrides. + * @see #getEnsemblePlacementDnsResolverClass() + * @see DNSResolverForRacks + * @see DNSResolverForRows + */ + public String getBkDNSResolverOverrides() { + return getString(BKDL_BK_DNS_RESOLVER_OVERRIDES, BKDL_BK_DNS_RESOLVER_OVERRIDES_DEFAULT); + } + + /** + * Set mapping used to override the region mapping derived by the default resolver + * <p>It is a string of pairs of host-region mappings (host:region) separated by semicolon. + * By default it is empty string. + * + * @param overrides + * dns resolver overrides + * @return dl configuration. + * @see #getBkDNSResolverOverrides() + */ + public DistributedLogConfiguration setBkDNSResolverOverrides(String overrides) { + setProperty(BKDL_BK_DNS_RESOLVER_OVERRIDES, overrides); + return this; + } + + // + // BookKeeper General Settings + // + + /** + * Set password used by bookkeeper client for digestion. + * <p> + * NOTE: not recommend to change. will be derepcated in future. + * + * @param bkDigestPW BK password digest + * @return distributedlog configuration + */ + public DistributedLogConfiguration setBKDigestPW(String bkDigestPW) { + setProperty(BKDL_BOOKKEEPER_DIGEST_PW, bkDigestPW); + return this; + } + + /** + * Get password used by bookkeeper client for digestion. + * <p> + * NOTE: not recommend to change. will be deprecated in future. + * + * @return password used by bookkeeper client for digestion + * @see #setBKDigestPW(String) + */ + public String getBKDigestPW() { + return getString(BKDL_BOOKKEEPER_DIGEST_PW, BKDL_BOOKKEEPER_DIGEST_PW_DEFAULT); + } + + /** + * Get BK client number of i/o threads used by Netty. + * The default value equals DL's number worker threads. + * + * @return number of bookkeeper netty i/o threads. + * @see #getNumWorkerThreads() + */ + public int getBKClientNumberIOThreads() { + return this.getInt(BKDL_BKCLIENT_NUM_IO_THREADS, getNumWorkerThreads()); + } + + /** + * Set BK client number of i/o threads used by netty. + * + * @param numThreads + * number io threads. + * @return distributedlog configuration. + * @see #getBKClientNumberIOThreads() + */ + public DistributedLogConfiguration setBKClientNumberIOThreads(int numThreads) { + setProperty(BKDL_BKCLIENT_NUM_IO_THREADS, numThreads); + return this; + } + + /** + * Get the tick duration in milliseconds that used for timeout timer in bookkeeper client. + * By default it is 100. + * + * @return tick duration in milliseconds + * @see org.jboss.netty.util.HashedWheelTimer + */ + public long getTimeoutTimerTickDurationMs() { + return getLong(BKDL_TIMEOUT_TIMER_TICK_DURATION_MS, BKDL_TIMEOUT_TIMER_TICK_DURATION_MS_DEFAULT); + } + + /** + * Set the tick duration in milliseconds that used for timeout timer in bookkeeper client. + * + * @param tickDuration + * tick duration in milliseconds. + * @return distributed log configuration. + * @see #getTimeoutTimerTickDurationMs() + */ + public DistributedLogConfiguration setTimeoutTimerTickDurationMs(long tickDuration) { + setProperty(BKDL_TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration); + return this; + } + + /** + * Get number of ticks that used for timeout timer in bookkeeper client. + * By default is 1024. + * + * @return number of ticks that used for timeout timer. + * @see org.jboss.netty.util.HashedWheelTimer + */ + public int getTimeoutTimerNumTicks() { + return getInt(BKDL_TIMEOUT_TIMER_NUM_TICKS, BKDL_TIMEOUT_TIMER_NUM_TICKS_DEFAULT); + } + + /** + * Set number of ticks that used for timeout timer in bookkeeper client. + * + * @param numTicks + * number of ticks that used for timeout timer. + * @return distributed log configuration. + * @see #getTimeoutTimerNumTicks() + */ + public DistributedLogConfiguration setTimeoutTimerNumTicks(int numTicks) { + setProperty(BKDL_TIMEOUT_TIMER_NUM_TICKS, numTicks); + return this; + } + + // + // Deprecated BookKeeper Settings + // + + /** + * Get BK client read timeout in seconds. + * <p> + * Please use {@link ClientConfiguration#getReadEntryTimeout()} + * instead of this setting. + * + * @return read timeout in seconds + * @deprecated + * @see ClientConfiguration#getReadEntryTimeout() + */ + public int getBKClientReadTimeout() { + return this.getInt(BKDL_BKCLIENT_READ_TIMEOUT, + BKDL_BKCLIENT_READ_TIMEOUT_DEFAULT); + } + + /** + * Set BK client read timeout in seconds. + * + * @param readTimeout read timeout in seconds. + * @return distributed log configuration + * @deprecated + * @see #getBKClientReadTimeout() + */ + public DistributedLogConfiguration setBKClientReadTimeout(int readTimeout) { + setProperty(BKDL_BKCLIENT_READ_TIMEOUT, readTimeout); + return this; + } + + /** + * Get BK client write timeout in seconds. + * <p> + * Please use {@link ClientConfiguration#getAddEntryTimeout()} + * instead of this setting. + * + * @return write timeout in seconds. + * @deprecated + * @see ClientConfiguration#getAddEntryTimeout() + */ + public int getBKClientWriteTimeout() { + return this.getInt(BKDL_BKCLIENT_WRITE_TIMEOUT, BKDL_BKCLIENT_WRITE_TIMEOUT_DEFAULT); + } + + /** + * Set BK client write timeout in seconds + * + * @param writeTimeout write timeout in seconds. + * @return distributed log configuration + * @deprecated + * @see #getBKClientWriteTimeout() + */ + public DistributedLogConfiguration setBKClientWriteTimeout(int writeTimeout) { + setProperty(BKDL_BKCLIENT_WRITE_TIMEOUT, writeTimeout); + return this; + } + + /** + * Get BK client number of worker threads. + * <p> + * Please use {@link ClientConfiguration#getNumWorkerThreads()} + * instead of this setting. + * + * @return number of bookkeeper client worker threads. + * @deprecated + * @see ClientConfiguration#getNumWorkerThreads() + */ + public int getBKClientNumberWorkerThreads() { + return this.getInt(BKDL_BKCLIENT_NUM_WORKER_THREADS, BKDL_BKCLEINT_NUM_WORKER_THREADS_DEFAULT); + } + + /** + * Set BK client number of worker threads. + * + * @param numThreads + * number worker threads. + * @return distributedlog configuration. + * @deprecated + * @see #getBKClientNumberWorkerThreads() + */ + public DistributedLogConfiguration setBKClientNumberWorkerThreads(int numThreads) { + setProperty(BKDL_BKCLIENT_NUM_WORKER_THREADS, numThreads); + return this; + } + + // + // DL Executor Settings + // + + /** + * Get the number of worker threads used by distributedlog namespace. + * By default it is the number of available processors. + * + * @return number of worker threads used by distributedlog namespace. + */ + public int getNumWorkerThreads() { + return getInt(BKDL_NUM_WORKER_THREADS, Runtime.getRuntime().availableProcessors()); + } + + /** + * Set the number of worker threads used by distributedlog namespace. + * + * @param numWorkerThreads + * number of worker threads used by distributedlog namespace. + * @return configuration + * @see #getNumWorkerThreads() + */ + public DistributedLogConfiguration setNumWorkerThreads(int numWorkerThreads) { + setProperty(BKDL_NUM_WORKER_THREADS, numWorkerThreads); + return this; + } + + /** + * Get the number of dedicated readahead worker threads used by distributedlog namespace. + * <p>If this value is non-positive, it would share the normal executor (see {@link #getNumWorkerThreads()} + * for readahead. otherwise, it would use a dedicated executor for readhead. By default, + * it is 0. + * + * @return number of dedicated readahead worker threads. + * @see #getNumWorkerThreads() + */ + @Deprecated + public int getNumReadAheadWorkerThreads() { + return getInt(BKDL_NUM_READAHEAD_WORKER_THREADS, 0); + } + + /** + * Set the number of dedicated readahead worker threads used by distributedlog namespace. + * + * @param numWorkerThreads + * number of dedicated readahead worker threads. + * @return configuration + * @see #getNumReadAheadWorkerThreads() + */ + @Deprecated + public DistributedLogConfiguration setNumReadAheadWorkerThreads(int numWorkerThreads) { + setProperty(BKDL_NUM_READAHEAD_WORKER_THREADS, numWorkerThreads); + return this; + } + + /** + * Get the number of lock state threads used by distributedlog namespace. + * By default it is 1. + * + * @return number of lock state threads used by distributedlog namespace. + */ + public int getNumLockStateThreads() { + return getInt(BKDL_NUM_LOCKSTATE_THREADS, 1); + } + + /** + * Set the number of lock state threads used by distributedlog manager factory. + * + * @param numLockStateThreads + * number of lock state threads used by distributedlog manager factory. + * @return configuration + * @see #getNumLockStateThreads() + */ + public DistributedLogConfiguration setNumLockStateThreads(int numLockStateThreads) { + setProperty(BKDL_NUM_LOCKSTATE_THREADS, numLockStateThreads); + return this; + } + + /** + * Get the number of resource release threads used by distributedlog namespace. + * By default it is 0 - the thread will be created dynamically by a executor service. + * The executor service is an unbounded pool. Application can use `total_tasks - completed_tasks` + * on monitoring the number of threads that are used for releasing resources. + * <p> + * The setting is only applied for v2 implementation. + * + * @see org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor + * @return number of resource release threads used by distributedlog namespace. + */ + public int getNumResourceReleaseThreads() { + return getInt(BKDL_NUM_RESOURCE_RELEASE_THREADS, 0); + } + + /** + * Set the number of resource release threads used by distributedlog manager factory. + * + * @param numResourceReleaseThreads + * number of resource release threads used by distributedlog manager factory. + * @return configuration + * @see #getNumResourceReleaseThreads() + */ + public DistributedLogConfiguration setNumResourceReleaseThreads(int numResourceReleaseThreads) { + setProperty(BKDL_NUM_RESOURCE_RELEASE_THREADS, numResourceReleaseThreads); + return this; + } + + /** + * Get timeout for shutting down schedulers in dl manager, in milliseconds. + * By default, it is 5 seconds. + * + * @return timeout for shutting down schedulers in dl manager, in miliseconds. + */ + public int getSchedulerShutdownTimeoutMs() { + return getInt(BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS, BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS_DEFAULT); + } + + /** + * Set timeout for shutting down schedulers in dl manager, in milliseconds. + * + * @param timeoutMs + * timeout for shutting down schedulers in dl manager, in milliseconds. + * @return dl configuration. + * @see #getSchedulerShutdownTimeoutMs() + */ + public DistributedLogConfiguration setSchedulerShutdownTimeoutMs(int timeoutMs) { + setProperty(BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS, timeoutMs); + return this; + } + + /** + * Whether to use daemon thread for DL threads. + * By default it is false. + * + * @return true if use daemon threads, otherwise false. + */ + public boolean getUseDaemonThread() { + return getBoolean(BKDL_USE_DAEMON_THREAD, BKDL_USE_DAEMON_THREAD_DEFAULT); + } + + /** + * Set whether to use daemon thread for DL threads. + * + * @param daemon + * whether to use daemon thread for DL threads. + * @return distributedlog configuration + * @see #getUseDaemonThread() + */ + public DistributedLogConfiguration setUseDaemonThread(boolean daemon) { + setProperty(BKDL_USE_DAEMON_THREAD, daemon); + return this; + } + + // + // Metadata Settings + // + + /** + * Get DL ledger metadata output layout version. + * + * @return layout version + * @see org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion + */ + public int getDLLedgerMetadataLayoutVersion() { + return this.getInt(BKDL_LEDGER_METADATA_LAYOUT_VERSION, + getInt(BKDL_LEDGER_METADATA_LAYOUT_VERSION_OLD, + BKDL_LEDGER_METADATA_LAYOUT_VERSION_DEFAULT)); + } + + /** + * Set DL ledger metadata output layout version. + * + * @param layoutVersion layout version + * @return distributed log configuration + * @throws IllegalArgumentException if setting an unknown layout version. + * @see #getDLLedgerMetadataLayoutVersion() + */ + public DistributedLogConfiguration setDLLedgerMetadataLayoutVersion(int layoutVersion) + throws IllegalArgumentException { + if ((layoutVersion <= 0) || + (layoutVersion > LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION)) { + // Incorrect version specified + throw new IllegalArgumentException("Incorrect value for ledger metadata layout version"); + } + setProperty(BKDL_LEDGER_METADATA_LAYOUT_VERSION, layoutVersion); + return this; + } + + /** + * Get the setting for whether we should enforce the min ledger metadata version check. + * By default it is false. + * + * @return whether we should enforce the min ledger metadata version check + * @see org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion + */ + public boolean getDLLedgerMetadataSkipMinVersionCheck() { + return this.getBoolean(BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK, + BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK_DEFAULT); + } + + /** + * Set if we should skip the enforcement of min ledger metadata version. + * <p>NOTE: please be aware the side effects of skipping min ledger metadata + * version checking. + * + * @param skipMinVersionCheck whether we should enforce the min ledger metadata version check + * @return distributed log configuration + * @see #getDLLedgerMetadataSkipMinVersionCheck() + */ + public DistributedLogConfiguration setDLLedgerMetadataSkipMinVersionCheck(boolean skipMinVersionCheck) throws IllegalArgumentException { + setProperty(BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK, skipMinVersionCheck); + return this; + } + + /** + * Get the value at which ledger sequence number should start for streams that are being + * upgraded and did not have ledger sequence number to start with or for newly created + * streams. By default, it is 1. + * <p>In most of the cases this value should not be changed. It is useful for backfilling + * in the case of migrating log segments whose metadata don't have log segment sequence number. + * + * @return first ledger sequence number + */ + public long getFirstLogSegmentSequenceNumber() { + return this.getLong(BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER, + getLong(BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_OLD, + BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_DEFAULT)); + } + + /** + * Set the value at which ledger sequence number should start for streams that are being + * upgraded and did not have ledger sequence number to start with or for newly created + * streams + * + * @param firstLogSegmentSequenceNumber first ledger sequence number + * @return distributed log configuration + * @see #getFirstLogSegmentSequenceNumber() + */ + public DistributedLogConfiguration setFirstLogSegmentSequenceNumber(long firstLogSegmentSequenceNumber) + throws IllegalArgumentException { + if (firstLogSegmentSequenceNumber <= 0) { + // Incorrect ledger sequence number specified + throw new IllegalArgumentException("Incorrect value for ledger sequence number"); + } + setProperty(BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER, firstLogSegmentSequenceNumber); + return this; + } + + /** + * Whether log segment sequence number validation is enabled? + * + * @return true if the log segment sequence number validation is enabled, otherwise false. + */ + public boolean isLogSegmentSequenceNumberValidationEnabled() { + return this.getBoolean(BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED, + BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED_DEFAULT); + } + + /** + * Whether log segment sequence number validation is enabled? + * + * @return true if the log segment sequence number validation is enabled, otherwise false. + */ + public DistributedLogConfiguration setLogSegmentSequenceNumberValidationEnabled(boolean enabled) { + setProperty(BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED, enabled); + return this; + } + + /** + * Whether we should publish record counts in the log records and metadata. + * <p>By default it is true. This is a legacy setting for log segment version 1. It + * should be considered removed. + * + * @return if record counts should be persisted + */ + public boolean getEnableRecordCounts() { + return getBoolean(BKDL_ENABLE_RECORD_COUNTS, BKDL_ENABLE_RECORD_COUNTS_DEFAULT); + } + + /** + * Set if we should publish record counts in the log records and metadata. + * + * @param enableRecordCounts enable record counts + * @return distributed log configuration + * @see #getEnableRecordCounts() + */ + public DistributedLogConfiguration setEnableRecordCounts(boolean enableRecordCounts) { + setProperty(BKDL_ENABLE_RECORD_COUNTS, enableRecordCounts); + return this; + } + + /** + * Whether sanity check txn id on starting log segments. + * <p>If it is enabled, DL writer would throw + * {@link org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException} + * when it received a smaller transaction id than current maximum transaction id. + * + * @return true if should check txn id with max txn id, otherwise false. + */ + @Deprecated + public boolean getSanityCheckTxnID() { + return getBoolean(BKDL_MAXID_SANITYCHECK, BKDL_MAXID_SANITYCHECK_DEFAULT); + } + + /** + * Enable/Disable sanity check txn id. + * + * @param enabled + * enable/disable sanity check txn id. + * @return configuration. + * @see #getSanityCheckTxnID() + */ + @Deprecated + public DistributedLogConfiguration setSanityCheckTxnID(boolean enabled) { + setProperty(BKDL_MAXID_SANITYCHECK, enabled); + return this; + } + + /** + * Whether encode region id in log segment metadata. + * <p>In global DL use case, encoding region id in log segment medata would + * help understanding what region that a log segment is created. The region + * id field in log segment metadata would help for moniotring and troubleshooting. + * + * @return whether to encode region id in log segment metadata. + */ + public boolean getEncodeRegionIDInLogSegmentMetadata() { + return getBoolean(BKDL_ENCODE_REGION_ID_IN_VERSION, BKDL_ENCODE_REGION_ID_IN_VERSION_DEFAULT); + } + + /** + * Enable/Disable encoding region id in log segment metadata. + * + * @param enabled + * flag to enable/disable encoding region id in log segment metadata. + * @return configuration instance. + * @see #getEncodeRegionIDInLogSegmentMetadata() + */ + public DistributedLogConfiguration setEncodeRegionIDInLogSegmentMetadata(boolean enabled) { + setProperty(BKDL_ENCODE_REGION_ID_IN_VERSION, enabled); + return this; + } + + /** + * Get log segment name version. + * <p> + * <ul> + * <li>version 0: inprogress_(start_txid) | + * logrecs_(start_txid)_(end_txid)</li> + * <li>version 1: inprogress_(logsegment_sequence_number) | + * logrecs_(logsegment_sequence_number)</li> + * </ul> + * By default it is 1. + * + * @return log segment name verison. + */ + public int getLogSegmentNameVersion() { + return getInt(BKDL_LOGSEGMENT_NAME_VERSION, BKDL_LOGSEGMENT_NAME_VERSION_DEFAULT); + } + + /** + * Set log segment name version. + * + * @param version + * log segment name version. + * @return configuration object. + * @see #getLogSegmentNameVersion() + */ + public DistributedLogConfiguration setLogSegmentNameVersion(int version) { + setProperty(BKDL_LOGSEGMENT_NAME_VERSION, version); + return this; + } + + /** + * Get name of the unpartitioned stream. + * <p>It is a legacy setting. consider removing it in future. + * + * @return unpartitioned stream + */ + public String getUnpartitionedStreamName() { + return getString(BKDL_UNPARTITIONED_STREAM_NAME, BKDL_UNPARTITIONED_STREAM_NAME_DEFAULT); + } + + /** + * Set name of the unpartitioned stream + * + * @param streamName name of the unpartitioned stream + * @return distributedlog configuration + * @see #getUnpartitionedStreamName() + */ + public DistributedLogConfiguration setUnpartitionedStreamName(String streamName) { + setProperty(BKDL_UNPARTITIONED_STREAM_NAME, streamName); + return this; + } + + // + // LogSegment Cache Settings + // + + /** + * Get the log segment cache entry TTL in milliseconds. + * + * @return log segment cache ttl in milliseconds. + */ + public long getLogSegmentCacheTTLMs() { + return getLong(BKDL_LOGSEGMENT_CACHE_TTL_MS, BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT); + } + + /** + * Set the log segment cache entry TTL in milliseconds. + * + * @param ttlMs TTL in milliseconds + * @return distributedlog configuration + */ + public DistributedLogConfiguration setLogSegmentCacheTTLMs(long ttlMs) { + setProperty(BKDL_LOGSEGMENT_CACHE_TTL_MS, ttlMs); + return this; + } + + /** + * Get the maximum size of the log segment cache. + * + * @return maximum size of the log segment cache. + */ + public long getLogSegmentCacheMaxSize() { + return getLong(BKDL_LOGSEGMENT_CACHE_MAX_SIZE, BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT); + } + + /** + * Set the maximum size of the log segment cache. + * + * @param maxSize maximum size of the log segment cache. + * @return distributedlog configuration + */ + public DistributedLogConfiguration setLogSegmentCacheMaxSize(long maxSize) { + setProperty(BKDL_LOGSEGMENT_CACHE_MAX_SIZE, maxSize); + return this; + } + + /** + * Is log segment cache enabled? + * + * @return true if log segment cache is enabled; otherwise false + */ + public boolean isLogSegmentCacheEnabled() { + return getBoolean(BKDL_LOGSEGMENT_CACHE_ENABLED, BKDL_LOGSEGMENT_CACHE_ENABLED_DEFAULT); + } + + /** + * Enable/disable log segment cache. + * + * @return distributedlog configuration + */ + public DistributedLogConfiguration setLogSegmentCacheEnabled(boolean enabled) { + setProperty(BKDL_LOGSEGMENT_CACHE_ENABLED, enabled); + return this; + } + + // + // DL Writer General Settings + // + + /** + * Whether to create stream if not exists. By default it is true. + * + * @return true if it is abled to create stream if not exists. + */ + public boolean getCreateStreamIfNotExists() { + return getBoolean(BKDL_CREATE_STREAM_IF_NOT_EXISTS, + BKDL_CREATE_STREAM_IF_NOT_EXISTS_DEFAULT); + } + + /** + * Enable/Disable creating stream if not exists. + * + * @param enabled + * enable/disable sanity check txn id. + * @return distributed log configuration. + * @see #getCreateStreamIfNotExists() + */ + public DistributedLogConfiguration setCreateStreamIfNotExists(boolean enabled) { + setProperty(BKDL_CREATE_STREAM_IF_NOT_EXISTS, enabled); + return this; + } + + /** + * Get Log Flush timeout in seconds. + * <p>This is a setting used by DL writer on flushing data. It is typically used + * by synchronous writer and log segment writer. By default it is 30 seconds. + * + * @return log flush timeout in seconds. + */ + // @Deprecated + public int getLogFlushTimeoutSeconds() { + return this.getInt(BKDL_LOG_FLUSH_TIMEOUT, BKDL_LOG_FLUSH_TIMEOUT_DEFAULT); + } + + /** + * Set Log Flush Timeout in seconds. + * + * @param logFlushTimeoutSeconds log flush timeout. + * @return distributed log configuration + * @see #getLogFlushTimeoutSeconds() + */ + public DistributedLogConfiguration setLogFlushTimeoutSeconds(int logFlushTimeoutSeconds) { + setProperty(BKDL_LOG_FLUSH_TIMEOUT, logFlushTimeoutSeconds); + return this; + } + + /** + * The compression type to use while sending data to bookkeeper. + * + * @return compression type to use + * @see org.apache.distributedlog.io.CompressionCodec + */ + public String getCompressionType() { + return getString(BKDL_COMPRESSION_TYPE, BKDL_COMPRESSION_TYPE_DEFAULT); + } + + /** + * Set the compression type to use while sending data to bookkeeper. + * + * @param compressionType compression type + * @return distributedlog configuration + * @see #getCompressionType() + */ + public DistributedLogConfiguration setCompressionType(String compressionType) { + Preconditions.checkArgument(null != compressionType && !compressionType.isEmpty()); + setProperty(BKDL_COMPRESSION_TYPE, compressionType); + return this; + } + + /** + * Whether to fail immediately if the stream is not ready rather than queueing the request. + * <p>If it is enabled, it would fail the write request immediately if the stream isn't ready. + * Consider turning it on for the use cases that could retry writing to other streams + * (aka non-strict ordering guarantee). It would result fast failure hence the client would + * retry immediately. + * + * @return true if should fail fast. otherwise, false. + */ + public boolean getFailFastOnStreamNotReady() { + return getBoolean(BKDL_FAILFAST_ON_STREAM_NOT_READY, + BKDL_FAILFAST_ON_STREAM_NOT_READY_DEFAULT); + } + + /** + * Set the failfast on stream not ready flag. + * + * @param failFastOnStreamNotReady + * set failfast flag + * @return dl configuration. + * @see #getFailFastOnStreamNotReady() + */ + public DistributedLogConfiguration setFailFastOnStreamNotReady(boolean failFastOnStreamNotReady) { + setProperty(BKDL_FAILFAST_ON_STREAM_NOT_READY, failFastOnStreamNotReady); + return this; + } + + /** + * If this option is set, the log writer won't reset the segment writer if an error + * is encountered. + * + * @return true if we should disable automatic rolling + */ + public boolean getDisableRollingOnLogSegmentError() { + return getBoolean(BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR, + BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR_DEFAULT); + } + + /** + * Set the roll on segment error flag. + * + * @param disableRollingOnLogSegmentError + * set roll on error flag + * @return dl configuration. + * @see #getDisableRollingOnLogSegmentError() + */ + public DistributedLogConfiguration setDisableRollingOnLogSegmentError(boolean disableRollingOnLogSegmentError) { + setProperty(BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR, disableRollingOnLogSegmentError); + return this; + } + + // + // DL Durability Settings + // + + /** + * Check whether the durable write is enabled. + * <p>It is enabled by default. + * + * @return true if durable write is enabled. otherwise, false. + */ + public boolean isDurableWriteEnabled() { + return this.getBoolean(BKDL_IS_DURABLE_WRITE_ENABLED, BKDL_IS_DURABLE_WRITE_ENABLED_DEFAULT); + } + + /** + * Enable/Disable durable writes in writers. + * + * @param enabled + * flag to enable/disable durable writes in writers. + * @return distributedlog configuration + */ + public DistributedLogConfiguration setDurableWriteEnabled(boolean enabled) { + setProperty(BKDL_IS_DURABLE_WRITE_ENABLED, enabled); + return this; + } + + // + // DL Writer Transmit Settings + // + + /** + * Get output buffer size for DL writers, in bytes. + * <p>Large buffer will result in higher compression ratio and + * it would use the bandwidth more efficiently and improve throughput. + * Set it to 0 would ask DL writers to transmit the data immediately, + * which it could achieve low latency. + * <p>The default value is 1KB. + * + * @return buffer size in byes. + */ + public int getOutputBufferSize() { + return this.getInt(BKDL_OUTPUT_BUFFER_SIZE, + getInt(BKDL_OUTPUT_BUFFER_SIZE_OLD, BKDL_OUTPUT_BUFFER_SIZE_DEFAULT)); + } + + /** + * Set output buffer size for DL writers, in bytes. + * + * @param opBufferSize output buffer size. + * @return distributed log configuration + * @see #getOutputBufferSize() + */ + public DistributedLogConfiguration setOutputBufferSize(int opBufferSize) { + setProperty(BKDL_OUTPUT_BUFFER_SIZE, opBufferSize); + return this; + } + + /** + * Get Periodic Log Flush Frequency in milliseconds. + * <p>If the setting is set with a positive value, the data in output buffer + * will be flushed in this provided interval. The default value is 0. + * + * @return periodic flush frequency in milliseconds. + * @see #getOutputBufferSize() + */ + public int getPeriodicFlushFrequencyMilliSeconds() { + return this.getInt(BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS, + BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS_DEFAULT); + } + + /** + * Set Periodic Log Flush Frequency in milliseconds. + * + * @param flushFrequencyMs periodic flush frequency in milliseconds. + * @return distributed log configuration + * @see #getPeriodicFlushFrequencyMilliSeconds() + */ + public DistributedLogConfiguration setPeriodicFlushFrequencyMilliSeconds(int flushFrequencyMs) { + setProperty(BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS, flushFrequencyMs); + return this; + } + + /** + * Is immediate flush enabled. + * <p>If it is enabled, it would flush control record immediately after adding + * data completed. The default value is false. + * + * @return whether immediate flush is enabled + */ + public boolean getImmediateFlushEnabled() { + return getBoolean(BKDL_ENABLE_IMMEDIATE_FLUSH, BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT); + } + + /** + * Enable/Disable immediate flush + * + * @param enabled + * flag to enable/disable immediate flush. + * @return configuration instance. + * @see #getImmediateFlushEnabled() + */ + public DistributedLogConfiguration setImmediateFlushEnabled(boolean enabled) { + setProperty(BKDL_ENABLE_IMMEDIATE_FLUSH, enabled); + return this; + } + + /** + * Get minimum delay between immediate flushes in milliseconds. + * <p>This setting only takes effects when {@link #getImmediateFlushEnabled()} + * is enabled. It torelants the bursty of traffic when immediate flush is enabled, + * which prevents sending too many control records to the bookkeeper. + * + * @return minimum delay between immediate flushes in milliseconds + * @see #getImmediateFlushEnabled() + */ + public int getMinDelayBetweenImmediateFlushMs() { + return this.getInt(BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS, BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT); + } + + /** + * Set minimum delay between immediate flushes in milliseconds + * + * @param minDelayMs minimum delay between immediate flushes in milliseconds. + * @return distributed log configuration + * @see #getMinDelayBetweenImmediateFlushMs() + */ + public DistributedLogConfiguration setMinDelayBetweenImmediateFlushMs(int minDelayMs) { + setProperty(BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS, minDelayMs); + return this; + } + + /** + * Get Periodic Keep Alive Frequency in milliseconds. + * <p>If the setting is set with a positive value, it would periodically write a control record + * to keep the stream active. The default value is 0. + * + * @return periodic keep alive frequency in milliseconds. + */ + public int getPeriodicKeepAliveMilliSeconds() { + return this.getInt(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT); + } + + /** + * Set Periodic Keep Alive Frequency in milliseconds. + * + * @param keepAliveMs keep alive frequency in milliseconds. + * @return distributedlog configuration + * @see #getPeriodicKeepAliveMilliSeconds() + */ + public DistributedLogConfiguration setPeriodicKeepAliveMilliSeconds(int keepAliveMs) { + setProperty(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, keepAliveMs); + return this; + } + + // + // DL Retention/Truncation Settings + // + + /** + * Get log segment retention period in hours. + * The default value is 3 days. + * + * @return log segment retention period in hours + */ + public int getRetentionPeriodHours() { + return this.getInt(BKDL_RETENTION_PERIOD_IN_HOURS, + getInt(BKDL_RETENTION_PERIOD_IN_HOURS_OLD, + BKDL_RETENTION_PERIOD_IN_HOURS_DEFAULT)); + } + + /** + * Set log segment retention period in hours. + * + * @param retentionHours retention period in hours. + * @return distributed log configuration + */ + public DistributedLogConfiguration setRetentionPeriodHours(int retentionHours) { + setProperty(BKDL_RETENTION_PERIOD_IN_HOURS, retentionHours); + return this; + } + + /** + * Is truncation managed explicitly by the application. + * <p>If this is set then time based retention is only a hint to perform + * deferred cleanup. However we never remove a segment that has not been + * already marked truncated. + * <p>It is disabled by default. + * + * @return whether truncation managed explicitly by the application + * @see org.apache.distributedlog.LogSegmentMetadata.TruncationStatus + */ + public boolean getExplicitTruncationByApplication() { + return getBoolean(BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION, + BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION_DEFAULT); + } + + /** + * Enable/Disable whether truncation is managed explicitly by the application. + * + * @param enabled + * flag to enable/disable whether truncation is managed explicitly by the application. + * @return configuration instance. + */ + public DistributedLogConfiguration setExplicitTruncationByApplication(boolean enabled) { + setProperty(BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION, enabled); + return this; + } + + // + // Log Segment Rolling Settings + // + + /** + * Get log segment rolling interval in minutes. + * <p>If the setting is set to a positive value, DL writer will roll log segments + * based on time. Otherwise, it will roll log segments based on size. + * <p>The default value is 2 hours. + * + * @return log segment rolling interval in minutes + * @see #getMaxLogS
<TRUNCATED>