http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
new file mode 100644
index 0000000..0cb608f
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
@@ -0,0 +1,3528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.bk.QuorumConfig;
+import org.apache.distributedlog.feature.DefaultFeatureProvider;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.net.DNSResolverForRacks;
+import org.apache.distributedlog.net.DNSResolverForRows;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * DistributedLog Configuration.
+ * <p>
+ * DistributedLog configuration is basically a properties based configuration, 
which extends from
+ * Apache commons {@link CompositeConfiguration}. All the DL settings are in 
camel case and prefixed
+ * with a meaningful component name. for example, `zkSessionTimeoutSeconds` 
means <i>SessionTimeoutSeconds</i>
+ * for component `zk`.
+ *
+ * <h3>BookKeeper Configuration</h3>
+ *
+ * BookKeeper client configuration settings could be loaded via DistributedLog 
configuration. All those
+ * settings are prefixed with <i>`bkc.`</i>. For example, <i>bkc.zkTimeout</i> 
in distributedlog configuration
+ * will be applied as <i>`zkTimeout`</i> in bookkeeper client configuration.
+ *
+ * <h3>How to load configuration</h3>
+ *
+ * The default distributedlog configuration is constructed by instantiated a 
new instance. This
+ * distributedlog configuration will automatically load the settings that 
specified via
+ * {@link SystemConfiguration}.
+ *
+ * <pre>
+ *      DistributedLogConfiguration conf = new DistributedLogConfiguration();
+ * </pre>
+ *
+ * The recommended way is to load configuration from URL that points to a 
configuration file
+ * ({@link #loadConf(URL)}).
+ *
+ * <pre>
+ *      String configFile = "/path/to/distributedlog/conf/file";
+ *      DistributedLogConfiguration conf = new DistributedLogConfiguration();
+ *      conf.loadConf(new File(configFile).toURI().toURL());
+ * </pre>
+ *
+ * @see org.apache.bookkeeper.conf.ClientConfiguration
+ */
+public class DistributedLogConfiguration extends CompositeConfiguration {
+    static final Logger LOG = 
LoggerFactory.getLogger(DistributedLogConfiguration.class);
+
+    private static ClassLoader defaultLoader;
+
+    static {
+        defaultLoader = Thread.currentThread().getContextClassLoader();
+        if (null == defaultLoader) {
+            defaultLoader = DistributedLogConfiguration.class.getClassLoader();
+        }
+    }
+
+    //
+    // ZooKeeper Related Settings
+    //
+
+    public static final String BKDL_ZK_ACL_ID = "zkAclId";
+    public static final String BKDL_ZK_ACL_ID_DEFAULT = null;
+    public static final String BKDL_ZK_SESSION_TIMEOUT_SECONDS = 
"zkSessionTimeoutSeconds";
+    public static final int BKDL_ZK_SESSION_TIMEOUT_SECONDS_DEFAULT = 30;
+    public static final String BKDL_ZK_REQUEST_RATE_LIMIT = 
"zkRequestRateLimit";
+    public static final double BKDL_ZK_REQUEST_RATE_LIMIT_DEFAULT = 0;
+    public static final String BKDL_ZK_NUM_RETRIES = "zkNumRetries";
+    public static final int BKDL_ZK_NUM_RETRIES_DEFAULT = 3;
+    public static final String BKDL_ZK_RETRY_BACKOFF_START_MILLIS = 
"zkRetryStartBackoffMillis";
+    public static final int BKDL_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT = 5000;
+    public static final String BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS = 
"zkRetryMaxBackoffMillis";
+    public static final int BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT = 30000;
+    public static final String BKDL_ZKCLIENT_NUM_RETRY_THREADS = 
"zkcNumRetryThreads";
+    public static final int BKDL_ZKCLIENT_NUM_RETRY_THREADS_DEFAULT = 1;
+
+    //
+    // BookKeeper Related Settings
+    //
+
+    // BookKeeper zookeeper settings
+    public static final String BKDL_BKCLIENT_ZK_SESSION_TIMEOUT = 
"bkcZKSessionTimeoutSeconds";
+    public static final int BKDL_BKCLIENT_ZK_SESSION_TIMEOUT_DEFAULT = 30;
+    public static final String BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT = 
"bkcZKRequestRateLimit";
+    public static final double BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT_DEFAULT = 0;
+    public static final String BKDL_BKCLIENT_ZK_NUM_RETRIES = 
"bkcZKNumRetries";
+    public static final int BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT = 3;
+    public static final String BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS = 
"bkcZKRetryStartBackoffMillis";
+    public static final int 
BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT = 5000;
+    public static final String BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS = 
"bkcZKRetryMaxBackoffMillis";
+    public static final int BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT 
= 30000;
+
+    // Bookkeeper ensemble placement settings
+    // Bookkeeper ensemble size
+    public static final String BKDL_BOOKKEEPER_ENSEMBLE_SIZE = 
"bkcEnsembleSize";
+    // @Deprecated
+    public static final String BKDL_BOOKKEEPER_ENSEMBLE_SIZE_OLD = 
"ensemble-size";
+    public static final int BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
+    // Bookkeeper write quorum size
+    public static final String BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE = 
"bkcWriteQuorumSize";
+    // @Deprecated
+    public static final String BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_OLD = 
"write-quorum-size";
+    public static final int BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_DEFAULT = 3;
+    // Bookkeeper ack quorum size
+    public static final String BKDL_BOOKKEEPER_ACK_QUORUM_SIZE = 
"bkcAckQuorumSize";
+    // @Deprecated
+    public static final String BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_OLD = 
"ack-quorum-size";
+    public static final int BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_DEFAULT = 2;
+    public static final String BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT = 
"bkRowAwareEnsemblePlacement";
+    public static final String BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_OLD = 
"row-aware-ensemble-placement";
+    public static final boolean BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_DEFAULT = 
false;
+    public static final String BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS = 
"bkEnsemblePlacementDnsResolverClass";
+    public static final String 
BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS_DEFAULT =
+            DNSResolverForRacks.class.getName();
+    public static final String BKDL_BK_DNS_RESOLVER_OVERRIDES = 
"dnsResolverOverrides";
+    public static final String BKDL_BK_DNS_RESOLVER_OVERRIDES_DEFAULT = "";
+
+    // General Settings
+    // @Deprecated
+    public static final String BKDL_BOOKKEEPER_DIGEST_PW = "digestPw";
+    public static final String BKDL_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
+    public static final String BKDL_BKCLIENT_NUM_IO_THREADS = 
"bkcNumIOThreads";
+    public static final String BKDL_TIMEOUT_TIMER_TICK_DURATION_MS = 
"timerTickDuration";
+    public static final long BKDL_TIMEOUT_TIMER_TICK_DURATION_MS_DEFAULT = 100;
+    public static final String BKDL_TIMEOUT_TIMER_NUM_TICKS = "timerNumTicks";
+    public static final int BKDL_TIMEOUT_TIMER_NUM_TICKS_DEFAULT = 1024;
+
+    //
+    // Deprecated BookKeeper Settings (in favor of "bkc." style bookkeeper 
settings)
+    //
+
+    public static final String BKDL_BKCLIENT_READ_TIMEOUT = 
"bkcReadTimeoutSeconds";
+    public static final int BKDL_BKCLIENT_READ_TIMEOUT_DEFAULT = 10;
+    public static final String BKDL_BKCLIENT_WRITE_TIMEOUT = 
"bkcWriteTimeoutSeconds";
+    public static final int BKDL_BKCLIENT_WRITE_TIMEOUT_DEFAULT = 10;
+    public static final String BKDL_BKCLIENT_NUM_WORKER_THREADS = 
"bkcNumWorkerThreads";
+    public static final int BKDL_BKCLEINT_NUM_WORKER_THREADS_DEFAULT = 1;
+
+    //
+    // DL General Settings
+    //
+
+    // Executor Parameters
+    public static final String BKDL_NUM_WORKER_THREADS = "numWorkerThreads";
+    public static final String BKDL_NUM_READAHEAD_WORKER_THREADS = 
"numReadAheadWorkerThreads";
+    public static final String BKDL_NUM_LOCKSTATE_THREADS = 
"numLockStateThreads";
+    public static final String BKDL_NUM_RESOURCE_RELEASE_THREADS = 
"numResourceReleaseThreads";
+    public static final String BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS = 
"schedulerShutdownTimeoutMs";
+    public static final int BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS_DEFAULT = 5000;
+    public static final String BKDL_USE_DAEMON_THREAD = "useDaemonThread";
+    public static final boolean BKDL_USE_DAEMON_THREAD_DEFAULT = false;
+
+    // Metadata Parameters
+    public static final String BKDL_LEDGER_METADATA_LAYOUT_VERSION = 
"ledgerMetadataLayoutVersion";
+    public static final String BKDL_LEDGER_METADATA_LAYOUT_VERSION_OLD = 
"ledger-metadata-layout";
+    public static final int BKDL_LEDGER_METADATA_LAYOUT_VERSION_DEFAULT =
+            
LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value;
+    public static final String BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK = 
"ledgerMetadataSkipMinVersionCheck";
+    public static final boolean 
BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK_DEFAULT = false;
+    public static final String BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER = 
"firstLogsegmentSequenceNumber";
+    public static final String BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_OLD = 
"first-logsegment-sequence-number";
+    public static final long BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_DEFAULT =
+            DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO;
+    public static final String 
BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED = 
"logSegmentSequenceNumberValidationEnabled";
+    public static final boolean 
BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED_DEFAULT = true;
+    public static final String BKDL_ENABLE_RECORD_COUNTS = 
"enableRecordCounts";
+    public static final boolean BKDL_ENABLE_RECORD_COUNTS_DEFAULT = true;
+    public static final String BKDL_MAXID_SANITYCHECK = "maxIdSanityCheck";
+    public static final boolean BKDL_MAXID_SANITYCHECK_DEFAULT = true;
+    public static final String BKDL_ENCODE_REGION_ID_IN_VERSION = 
"encodeRegionIDInVersion";
+    public static final boolean BKDL_ENCODE_REGION_ID_IN_VERSION_DEFAULT = 
false;
+    // (@Deprecated)
+    public static final String BKDL_LOGSEGMENT_NAME_VERSION = 
"logSegmentNameVersion";
+    public static final int BKDL_LOGSEGMENT_NAME_VERSION_DEFAULT = 
DistributedLogConstants.LOGSEGMENT_NAME_VERSION;
+    // (@Derepcated) Name for the default (non-partitioned) stream
+    public static final String BKDL_UNPARTITIONED_STREAM_NAME = 
"unpartitionedStreamName";
+    public static final String BKDL_UNPARTITIONED_STREAM_NAME_DEFAULT = 
"<default>";
+
+    // Log Segment Cache Parameters
+    public static final String BKDL_LOGSEGMENT_CACHE_TTL_MS = 
"logSegmentCacheTTLMs";
+    public static final long BKDL_LOGSEGMENT_CACHE_TTL_MS_DEFAULT = 600000; // 
10 mins
+    public static final String BKDL_LOGSEGMENT_CACHE_MAX_SIZE = 
"logSegmentCacheMaxSize";
+    public static final long BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT = 10000;
+    public static final String BKDL_LOGSEGMENT_CACHE_ENABLED = 
"logSegmentCacheEnabled";
+    public static final boolean BKDL_LOGSEGMENT_CACHE_ENABLED_DEFAULT = true;
+
+    //
+    // DL Writer Settings
+    //
+
+    // General Settings
+    public static final String BKDL_CREATE_STREAM_IF_NOT_EXISTS = 
"createStreamIfNotExists";
+    public static final boolean BKDL_CREATE_STREAM_IF_NOT_EXISTS_DEFAULT = 
true;
+    public static final String BKDL_LOG_FLUSH_TIMEOUT = 
"logFlushTimeoutSeconds";
+    public static final int BKDL_LOG_FLUSH_TIMEOUT_DEFAULT = 30;
+    /**
+     *  CompressionCodec.Type     String to use (See CompressionUtils)
+     *  ---------------------     ------------------------------------
+     *          NONE               none
+     *          LZ4                lz4
+     *          UNKNOWN            any other instance of String.class
+     */
+    public static final String BKDL_COMPRESSION_TYPE = "compressionType";
+    public static final String BKDL_COMPRESSION_TYPE_DEFAULT = "none";
+    public static final String BKDL_FAILFAST_ON_STREAM_NOT_READY = 
"failFastOnStreamNotReady";
+    public static final boolean BKDL_FAILFAST_ON_STREAM_NOT_READY_DEFAULT = 
false;
+    public static final String BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR = 
"disableRollingOnLogSegmentError";
+    public static final boolean 
BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR_DEFAULT = false;
+
+    // Durability Settings
+    public static final String BKDL_IS_DURABLE_WRITE_ENABLED = 
"isDurableWriteEnabled";
+    public static final boolean BKDL_IS_DURABLE_WRITE_ENABLED_DEFAULT = true;
+
+    // Transmit Settings
+    public static final String BKDL_OUTPUT_BUFFER_SIZE = 
"writerOutputBufferSize";
+    public static final String BKDL_OUTPUT_BUFFER_SIZE_OLD = 
"output-buffer-size";
+    public static final int BKDL_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
+    public static final String BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS = 
"periodicFlushFrequencyMilliSeconds";
+    public static final int BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS_DEFAULT 
= 0;
+    public static final String BKDL_ENABLE_IMMEDIATE_FLUSH = 
"enableImmediateFlush";
+    public static final boolean BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT = false;
+    public static final String 
BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS = 
"minimumDelayBetweenImmediateFlushMilliSeconds";
+    public static final int 
BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT = 0;
+    public static final String BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS = 
"periodicKeepAliveMilliSeconds";
+    public static final int BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT = 0;
+
+    // Retention/Truncation Settings
+    public static final String BKDL_RETENTION_PERIOD_IN_HOURS = 
"logSegmentRetentionHours";
+    public static final String BKDL_RETENTION_PERIOD_IN_HOURS_OLD = 
"retention-size";
+    public static final int BKDL_RETENTION_PERIOD_IN_HOURS_DEFAULT = 72;
+    public static final String BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION = 
"explicitTruncationByApp";
+    public static final boolean 
BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION_DEFAULT = false;
+
+    // Log Segment Rolling Settings
+    public static final String BKDL_ROLLING_INTERVAL_IN_MINUTES = 
"logSegmentRollingMinutes";
+    public static final String BKDL_ROLLING_INTERVAL_IN_MINUTES_OLD = 
"rolling-interval";
+    public static final int BKDL_ROLLING_INTERVAL_IN_MINUTES_DEFAULT = 120;
+    public static final String BKDL_MAX_LOGSEGMENT_BYTES = 
"maxLogSegmentBytes";
+    public static final int BKDL_MAX_LOGSEGMENT_BYTES_DEFAULT = 256 * 1024 * 
1024; // default 256MB
+    public static final String BKDL_LOGSEGMENT_ROLLING_CONCURRENCY = 
"logSegmentRollingConcurrency";
+    public static final int BKDL_LOGSEGMENT_ROLLING_CONCURRENCY_DEFAULT = 1;
+
+    // Lock Settings
+    public static final String BKDL_WRITE_LOCK_ENABLED = "writeLockEnabled";
+    public static final boolean BKDL_WRITE_LOCK_ENABLED_DEFAULT = true;
+    public static final String BKDL_LOCK_TIMEOUT = "lockTimeoutSeconds";
+    public static final long BKDL_LOCK_TIMEOUT_DEFAULT = 30;
+    public static final String BKDL_LOCK_REACQUIRE_TIMEOUT = 
"lockReacquireTimeoutSeconds";
+    public static final long BKDL_LOCK_REACQUIRE_TIMEOUT_DEFAULT = 
DistributedLogConstants.LOCK_REACQUIRE_TIMEOUT_DEFAULT;
+    public static final String BKDL_LOCK_OP_TIMEOUT = "lockOpTimeoutSeconds";
+    public static final long BKDL_LOCK_OP_TIMEOUT_DEFAULT = 
DistributedLogConstants.LOCK_OP_TIMEOUT_DEFAULT;
+
+    // Ledger Allocator Settings
+    public static final String BKDL_ENABLE_LEDGER_ALLOCATOR_POOL = 
"enableLedgerAllocatorPool";
+    public static final boolean BKDL_ENABLE_LEDGER_ALLOCATOR_POOL_DEFAULT = 
false;
+    public static final String BKDL_LEDGER_ALLOCATOR_POOL_PATH = 
"ledgerAllocatorPoolPath";
+    public static final String BKDL_LEDGER_ALLOCATOR_POOL_PATH_DEFAULT = 
DistributedLogConstants.ALLOCATION_POOL_NODE;
+    public static final String BKDL_LEDGER_ALLOCATOR_POOL_NAME = 
"ledgerAllocatorPoolName";
+    public static final String BKDL_LEDGER_ALLOCATOR_POOL_NAME_DEFAULT = null;
+    public static final String BKDL_LEDGER_ALLOCATOR_POOL_CORE_SIZE = 
"ledgerAllocatorPoolCoreSize";
+    public static final int BKDL_LEDGER_ALLOCATOR_POOL_CORE_SIZE_DEFAULT = 20;
+
+    // Write Limit Settings
+    public static final String BKDL_PER_WRITER_OUTSTANDING_WRITE_LIMIT = 
"perWriterOutstandingWriteLimit";
+    public static final int BKDL_PER_WRITER_OUTSTANDING_WRITE_LIMIT_DEFAULT = 
-1;
+    public static final String BKDL_GLOBAL_OUTSTANDING_WRITE_LIMIT = 
"globalOutstandingWriteLimit";
+    public static final int BKDL_GLOBAL_OUTSTANDING_WRITE_LIMIT_DEFAULT = -1;
+    public static final String BKDL_OUTSTANDING_WRITE_LIMIT_DARKMODE = 
"outstandingWriteLimitDarkmode";
+    public static final boolean BKDL_OUTSTANDING_WRITE_LIMIT_DARKMODE_DEFAULT 
= true;
+
+    //
+    // DL Reader Settings
+    //
+
+    // General Settings
+    public static final String BKDL_READLAC_OPTION = "readLACLongPoll";
+    public static final int BKDL_READLAC_OPTION_DEFAULT = 3; 
//BKLogPartitionReadHandler.ReadLACOption.READENTRYPIGGYBACK_SEQUENTIAL.value
+    public static final String BKDL_READLACLONGPOLL_TIMEOUT = 
"readLACLongPollTimeout";
+    public static final int BKDL_READLACLONGPOLL_TIMEOUT_DEFAULT = 1000;
+    public static final String BKDL_DESERIALIZE_RECORDSET_ON_READS = 
"deserializeRecordSetOnReads";
+    public static final boolean BKDL_DESERIALIZE_RECORDSET_ON_READS_DEFAULT = 
true;
+
+    // Idle reader settings
+    public static final String BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS = 
"readerIdleWarnThresholdMillis";
+    public static final int BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS_DEFAULT = 
120000;
+    public static final String BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS = 
"readerIdleErrorThresholdMillis";
+    public static final int BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS_DEFAULT = 
Integer.MAX_VALUE;
+
+    // Reader constraint settings
+    public static final String BKDL_READER_IGNORE_TRUNCATION_STATUS = 
"ignoreTruncationStatus";
+    public static final boolean BKDL_READER_IGNORE_TRUNCATION_STATUS_DEFAULT = 
false;
+    public static final String BKDL_READER_ALERT_POSITION_ON_TRUNCATED = 
"alertPositionOnTruncated";
+    public static final boolean 
BKDL_READER_ALERT_POSITION_ON_TRUNCATED_DEFAULT = true;
+    public static final String BKDL_READER_POSITION_GAP_DETECTION_ENABLED = 
"positionGapDetectionEnabled";
+    public static final boolean 
BKDL_READER_POSITION_GAP_DETECTION_ENABLED_DEFAULT = false;
+
+    // Read ahead related parameters
+    public static final String BKDL_ENABLE_READAHEAD = "enableReadAhead";
+    public static final boolean BKDL_ENABLE_READAHEAD_DEFAULT = true;
+    public static final String BKDL_ENABLE_FORCEREAD = "enableForceRead";
+    public static final boolean BKDL_ENABLE_FORCEREAD_DEFAULT = true;
+    public static final String BKDL_READAHEAD_MAX_RECORDS = 
"readAheadMaxRecords";
+    public static final String BKDL_READAHEAD_MAX_RECORDS_OLD = 
"ReadAheadMaxEntries";
+    public static final int BKDL_READAHEAD_MAX_RECORDS_DEFAULT = 10;
+    public static final String BKDL_READAHEAD_BATCHSIZE = "readAheadBatchSize";
+    public static final String BKDL_READAHEAD_BATCHSIZE_OLD = 
"ReadAheadBatchSize";
+    public static final int BKDL_READAHEAD_BATCHSIZE_DEFAULT = 2;
+    public static final String BKDL_READAHEAD_WAITTIME = "readAheadWaitTime";
+    public static final String BKDL_READAHEAD_WAITTIME_OLD = 
"ReadAheadWaitTime";
+    public static final int BKDL_READAHEAD_WAITTIME_DEFAULT = 200;
+    public static final String BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM = 
"readAheadWaitTimeOnEndOfStream";
+    public static final String BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM_OLD = 
"ReadAheadWaitTimeOnEndOfStream";
+    public static final int BKDL_READAHEAD_WAITTIME_ON_ENDOFSTREAM_DEFAULT = 
10000;
+    public static final String 
BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS =
+            "readAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis";
+    public static final int 
BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS_DEFAULT 
= 10000;
+    public static final String BKDL_READAHEAD_SKIP_BROKEN_ENTRIES = 
"readAheadSkipBrokenEntries";
+    public static final boolean BKDL_READAHEAD_SKIP_BROKEN_ENTRIES_DEFAULT = 
false;
+    public static final String BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT = 
"numPrefetchEntriesPerLogSegment";
+    public static final int BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT = 
4;
+    public static final String BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT = 
"maxPrefetchEntriesPerLogSegment";
+    public static final int BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT = 
32;
+
+    // Scan Settings
+    public static final String 
BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN = 
"firstNumEntriesEachPerLastRecordScan";
+    public static final int 
BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN_DEFAULT = 2;
+    public static final String BKDL_MAX_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN 
= "maxNumEntriesPerReadLastRecordScan";
+    public static final int 
BKDL_MAX_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN_DEFAULT = 16;
+
+    // Log Existence Settings
+    public static final String BKDL_CHECK_LOG_EXISTENCE_BACKOFF_START_MS = 
"checkLogExistenceBackoffStartMillis";
+    public static final int BKDL_CHECK_LOG_EXISTENCE_BACKOFF_START_MS_DEFAULT 
= 200;
+    public static final String BKDL_CHECK_LOG_EXISTENCE_BACKOFF_MAX_MS = 
"checkLogExistenceBackoffMaxMillis";
+    public static final int BKDL_CHECK_LOG_EXISTENCE_BACKOFF_MAX_MS_DEFAULT = 
1000;
+
+    //
+    // Tracing/Stats Settings
+    //
+
+    public static final String BKDL_TRACE_READAHEAD_DELIVERY_LATENCY = 
"traceReadAheadDeliveryLatency";
+    public static final boolean BKDL_TRACE_READAHEAD_DELIVERY_LATENCY_DEFAULT 
= false;
+    public static final String BKDL_METADATA_LATENCY_WARN_THRESHOLD_MS = 
"metadataLatencyWarnThresholdMs";
+    public static final long BKDL_METADATA_LATENCY_WARN_THRESHOLD_MS_DEFAULT = 
DistributedLogConstants.LATENCY_WARN_THRESHOLD_IN_MILLIS;
+    public static final String BKDL_DATA_LATENCY_WARN_THRESHOLD_MS = 
"dataLatencyWarnThresholdMs";
+    public static final long BKDL_DATA_LATENCY_WARN_THRESHOLD_MS_DEFAULT = 2 * 
DistributedLogConstants.LATENCY_WARN_THRESHOLD_IN_MILLIS;
+    public static final String BKDL_TRACE_READAHEAD_METADATA_CHANGES = 
"traceReadAheadMetadataChanges";
+    public static final boolean BKDL_TRACE_READAHEAD_MEATDATA_CHANGES_DEFAULT 
= false;
+    public final static String BKDL_ENABLE_TASK_EXECUTION_STATS = 
"enableTaskExecutionStats";
+    public final static boolean BKDL_ENABLE_TASK_EXECUTION_STATS_DEFAULT = 
false;
+    public final static String BKDL_TASK_EXECUTION_WARN_TIME_MICROS = 
"taskExecutionWarnTimeMicros";
+    public final static long BKDL_TASK_EXECUTION_WARN_TIME_MICROS_DEFAULT = 
100000;
+    public static final String BKDL_ENABLE_PERSTREAM_STAT = 
"enablePerStreamStat";
+    public static final boolean BKDL_ENABLE_PERSTREAM_STAT_DEFAULT = false;
+
+    //
+    // Settings for Feature Providers
+    //
+
+    public static final String BKDL_FEATURE_PROVIDER_CLASS = 
"featureProviderClass";
+
+    //
+    // Settings for Configuration Based Feature Provider
+    //
+
+    public static final String BKDL_FILE_FEATURE_PROVIDER_BASE_CONFIG_PATH = 
"fileFeatureProviderBaseConfigPath";
+    public static final String 
BKDL_FILE_FEATURE_PROVIDER_BASE_CONFIG_PATH_DEFAULT = "decider.conf";
+    public static final String BKDL_FILE_FEATURE_PROVIDER_OVERLAY_CONFIG_PATH 
= "fileFeatureProviderOverlayConfigPath";
+    public static final String 
BKDL_FILE_FEATURE_PROVIDER_OVERLAY_CONFIG_PATH_DEFAULT = null;
+
+    //
+    // Settings for Namespaces
+    //
+
+    public static final String BKDL_FEDERATED_NAMESPACE_ENABLED = 
"federatedNamespaceEnabled";
+    public static final boolean BKDL_FEDERATED_NAMESPACE_ENABLED_DEFAULT = 
false;
+    public static final String BKDL_FEDERATED_MAX_LOGS_PER_SUBNAMESPACE = 
"federatedMaxLogsPerSubnamespace";
+    public static final int BKDL_FEDERATED_MAX_LOGS_PER_SUBNAMESPACE_DEFAULT = 
15000;
+    public static final String BKDL_FEDERATED_CHECK_EXISTENCE_WHEN_CACHE_MISS 
= "federatedCheckExistenceWhenCacheMiss";
+    public static final boolean 
BKDL_FEDERATED_CHECK_EXISTENCE_WHEN_CACHE_MISS_DEFAULT = true;
+
+    // Settings for Configurations
+
+    public static final String BKDL_DYNAMIC_CONFIG_RELOAD_INTERVAL_SEC = 
"dynamicConfigReloadIntervalSec";
+    public static final int BKDL_DYNAMIC_CONFIG_RELOAD_INTERVAL_SEC_DEFAULT = 
60;
+    public static final String BKDL_STREAM_CONFIG_ROUTER_CLASS = 
"streamConfigRouterClass";
+    public static final String BKDL_STREAM_CONFIG_ROUTER_CLASS_DEFAULT = 
"org.apache.distributedlog.service.config.IdentityConfigRouter";
+
+    // Settings for RateLimit (used by distributedlog-service)
+
+    public static final String BKDL_BPS_SOFT_WRITE_LIMIT = "bpsSoftWriteLimit";
+    public static final int BKDL_BPS_SOFT_WRITE_LIMIT_DEFAULT = -1;
+    public static final String BKDL_BPS_HARD_WRITE_LIMIT = "bpsHardWriteLimit";
+    public static final int BKDL_BPS_HARD_WRITE_LIMIT_DEFAULT = -1;
+    public static final String BKDL_RPS_SOFT_WRITE_LIMIT = "rpsSoftWriteLimit";
+    public static final int BKDL_RPS_SOFT_WRITE_LIMIT_DEFAULT = -1;
+    public static final String BKDL_RPS_HARD_WRITE_LIMIT = "rpsHardWriteLimit";
+    public static final int BKDL_RPS_HARD_WRITE_LIMIT_DEFAULT = -1;
+
+    // Rate and resource limits: per shard
+
+    public static final String BKDL_RPS_SOFT_SERVICE_LIMIT = 
"rpsSoftServiceLimit";
+    public static final int BKDL_RPS_SOFT_SERVICE_LIMIT_DEFAULT = -1;
+    public static final String BKDL_RPS_HARD_SERVICE_LIMIT = 
"rpsHardServiceLimit";
+    public static final int BKDL_RPS_HARD_SERVICE_LIMIT_DEFAULT = -1;
+    public static final String BKDL_RPS_STREAM_ACQUIRE_SERVICE_LIMIT = 
"rpsStreamAcquireServiceLimit";
+    public static final int BKDL_RPS_STREAM_ACQUIRE_SERVICE_LIMIT_DEFAULT = -1;
+    public static final String BKDL_BPS_SOFT_SERVICE_LIMIT = 
"bpsSoftServiceLimit";
+    public static final int BKDL_BPS_SOFT_SERVICE_LIMIT_DEFAULT = -1;
+    public static final String BKDL_BPS_HARD_SERVICE_LIMIT = 
"bpsHardServiceLimit";
+    public static final int BKDL_BPS_HARD_SERVICE_LIMIT_DEFAULT = -1;
+    public static final String BKDL_BPS_STREAM_ACQUIRE_SERVICE_LIMIT = 
"bpsStreamAcquireServiceLimit";
+    public static final int BKDL_BPS_STREAM_ACQUIRE_SERVICE_LIMIT_DEFAULT = -1;
+
+    // Settings for Partitioning
+
+    public static final String BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY = 
"maxAcquiredPartitionsPerProxy";
+    public static final int BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY_DEFAULT = 
-1;
+
+    public static final String BKDL_MAX_CACHED_PARTITIONS_PER_PROXY = 
"maxCachedPartitionsPerProxy";
+    public static final int BKDL_MAX_CACHED_PARTITIONS_PER_PROXY_DEFAULT = -1;
+
+    //
+    // Settings for Error Injection
+    //
+    public static final String BKDL_EI_INJECT_WRITE_DELAY = 
"eiInjectWriteDelay";
+    public static final boolean BKDL_EI_INJECT_WRITE_DELAY_DEFAULT = false;
+    public static final String BKDL_EI_INJECTED_WRITE_DELAY_PERCENT = 
"eiInjectedWriteDelayPercent";
+    public static final double BKDL_EI_INJECTED_WRITE_DELAY_PERCENT_DEFAULT = 
0.0;
+    public static final String BKDL_EI_INJECTED_WRITE_DELAY_MS = 
"eiInjectedWriteDelayMs";
+    public static final int BKDL_EI_INJECTED_WRITE_DELAY_MS_DEFAULT = 0;
+    public static final String BKDL_EI_INJECT_READAHEAD_STALL = 
"eiInjectReadAheadStall";
+    public static final boolean BKDL_EI_INJECT_READAHEAD_STALL_DEFAULT = false;
+    public static final String BKDL_EI_INJECT_READAHEAD_DELAY = 
"eiInjectReadAheadDelay";
+    public static final boolean BKDL_EI_INJECT_READAHEAD_DELAY_DEFAULT = false;
+    public static final String BKDL_EI_INJECT_MAX_READAHEAD_DELAY_MS = 
"eiInjectMaxReadAheadDelayMs";
+    public static final int BKDL_EI_INJECT_MAX_READAHEAD_DELAY_MS_DEFAULT = 0;
+    public static final String BKDL_EI_INJECT_READAHEAD_DELAY_PERCENT = 
"eiInjectReadAheadDelayPercent";
+    public static final int BKDL_EI_INJECT_READAHEAD_DELAY_PERCENT_DEFAULT = 
10;
+    public static final String BKDL_EI_INJECT_READAHEAD_BROKEN_ENTRIES = 
"eiInjectReadAheadBrokenEntries";
+    public static final boolean 
BKDL_EI_INJECT_READAHEAD_BROKEN_ENTRIES_DEFAULT = false;
+
+    // Whitelisted stream-level configuration settings.
+    private static final Set<String> streamSettings = Sets.newHashSet(
+        BKDL_READER_POSITION_GAP_DETECTION_ENABLED,
+        BKDL_READER_IDLE_ERROR_THRESHOLD_MILLIS,
+        BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS,
+        BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS,
+        BKDL_ENABLE_IMMEDIATE_FLUSH
+    );
+
+    /**
+     * Construct distributedlog configuration with default settings.
+     * It also loads the settings from system properties.
+     */
+    public DistributedLogConfiguration() {
+        super();
+        // add configuration for system properties
+        addConfiguration(new SystemConfiguration());
+    }
+
+    /**
+     * You can load configurations in precedence order. The first one takes
+     * precedence over any loaded later.
+     *
+     * @param confURL Configuration URL
+     */
+    public void loadConf(URL confURL) throws ConfigurationException {
+        Configuration loadedConf = new PropertiesConfiguration(confURL);
+        addConfiguration(loadedConf);
+    }
+
+    /**
+     * You can load configuration from other configuration
+     *
+     * @param baseConf Other Configuration
+     */
+    public void loadConf(DistributedLogConfiguration baseConf) {
+        addConfiguration(baseConf);
+    }
+
+    /**
+     * Load configuration from other configuration object
+     *
+     * @param otherConf Other configuration object
+     */
+    public void loadConf(Configuration otherConf) {
+        addConfiguration(otherConf);
+    }
+
+    /**
+     * Load whitelisted stream configuration from another configuration object
+     *
+     * @param streamConfiguration stream configuration overrides
+     */
+    public void loadStreamConf(Optional<DistributedLogConfiguration> 
streamConfiguration) {
+        if (!streamConfiguration.isPresent()) {
+            return;
+        }
+        ArrayList<Object> ignoredSettings = new ArrayList<Object>();
+        Iterator iterator = streamConfiguration.get().getKeys();
+        while (iterator.hasNext()) {
+            Object setting = iterator.next();
+            if (setting instanceof String && streamSettings.contains(setting)) 
{
+                String settingStr = (String) setting;
+                setProperty(settingStr, 
streamConfiguration.get().getProperty(settingStr));
+            } else {
+                ignoredSettings.add(setting);
+            }
+        }
+        if (LOG.isWarnEnabled() && !ignoredSettings.isEmpty()) {
+            LOG.warn("invalid stream configuration override(s): {}",
+                StringUtils.join(ignoredSettings, ";"));
+        }
+    }
+
+    //
+    // ZooKeeper Related Settings
+    //
+
+    /**
+     * Get all properties as a string.
+     */
+    public String getPropsAsString() {
+        Iterator iterator = getKeys();
+        StringBuilder builder = new StringBuilder();
+        boolean appendNewline = false;
+        while (iterator.hasNext()) {
+            Object key = iterator.next();
+            if (key instanceof String) {
+                if (appendNewline) {
+                    builder.append("\n");
+                }
+                Object value = getProperty((String)key);
+                builder.append(key).append("=").append(value);
+                appendNewline = true;
+            }
+        }
+        return builder.toString();
+    }
+
+    /**
+     * Get digest id used for ZK acl.
+     *
+     * @return zk acl id.
+     */
+    public String getZkAclId() {
+        return getString(BKDL_ZK_ACL_ID, BKDL_ZK_ACL_ID_DEFAULT);
+    }
+
+    /**
+     * Set digest id to use for ZK acl.
+     *
+     * @param zkAclId acl id.
+     * @return distributedlog configuration
+     * @see #getZkAclId()
+     */
+    public DistributedLogConfiguration setZkAclId(String zkAclId) {
+        setProperty(BKDL_ZK_ACL_ID, zkAclId);
+        return this;
+    }
+
+    /**
+     * Get ZK Session timeout in seconds.
+     * <p>
+     * This is the session timeout applied for zookeeper client used by 
distributedlog.
+     * Use {@link #getBKClientZKSessionTimeoutMilliSeconds()} for zookeeper 
client used
+     * by bookkeeper client.
+     *
+     * @return zookeeeper session timeout in seconds.
+     * @deprecated use {@link #getZKSessionTimeoutMilliseconds()}
+     */
+    public int getZKSessionTimeoutSeconds() {
+        return this.getInt(BKDL_ZK_SESSION_TIMEOUT_SECONDS, 
BKDL_ZK_SESSION_TIMEOUT_SECONDS_DEFAULT);
+    }
+
+    /**
+     * Get ZK Session timeout in milliseconds.
+     * <p>
+     * This is the session timeout applied for zookeeper client used by 
distributedlog.
+     * Use {@link #getBKClientZKSessionTimeoutMilliSeconds()} for zookeeper 
client used
+     * by bookkeeper client.
+     *
+     * @return zk session timeout in milliseconds.
+     */
+    public int getZKSessionTimeoutMilliseconds() {
+        return this.getInt(BKDL_ZK_SESSION_TIMEOUT_SECONDS, 
BKDL_ZK_SESSION_TIMEOUT_SECONDS_DEFAULT) * 1000;
+    }
+
+    /**
+     * Set ZK Session Timeout in seconds.
+     *
+     * @param zkSessionTimeoutSeconds session timeout in seconds.
+     * @return distributed log configuration
+     * @see #getZKSessionTimeoutMilliseconds()
+     */
+    public DistributedLogConfiguration setZKSessionTimeoutSeconds(int 
zkSessionTimeoutSeconds) {
+        setProperty(BKDL_ZK_SESSION_TIMEOUT_SECONDS, zkSessionTimeoutSeconds);
+        return this;
+    }
+
+    /**
+     * Get zookeeper access rate limit.
+     * <p>The rate limiter is basically a guava {@link 
com.google.common.util.concurrent.RateLimiter}.
+     * It is rate limiting the requests that sent by zookeeper client. If the 
value is non-positive,
+     * the rate limiting is disable. By default it is disable (value = 0).
+     *
+     * @return zookeeper access rate, by default it is 0.
+     */
+    public double getZKRequestRateLimit() {
+        return this.getDouble(BKDL_ZK_REQUEST_RATE_LIMIT, 
BKDL_ZK_REQUEST_RATE_LIMIT_DEFAULT);
+    }
+
+    /**
+     * Set zookeeper access rate limit (rps).
+     *
+     * @param requestRateLimit
+     *          zookeeper access rate limit
+     * @return distributedlog configuration
+     * @see #getZKRequestRateLimit()
+     */
+    public DistributedLogConfiguration setZKRequestRateLimit(double 
requestRateLimit) {
+        setProperty(BKDL_ZK_REQUEST_RATE_LIMIT, requestRateLimit);
+        return this;
+    }
+
+    /**
+     * Get num of retries per request for zookeeper client.
+     * <p>Retries only happen on retryable failures like session expired,
+     * session moved. for permanent failures, the request will fail 
immediately.
+     * The default value is 3.
+     *
+     * @return num of retries per request of zookeeper client.
+     */
+    public int getZKNumRetries() {
+        return this.getInt(BKDL_ZK_NUM_RETRIES, BKDL_ZK_NUM_RETRIES_DEFAULT);
+    }
+
+    /**
+     * Set num of retries per request for zookeeper client.
+     *
+     * @param zkNumRetries num of retries per request of zookeeper client.
+     * @return distributed log configuration
+     * @see #getZKNumRetries()
+     */
+    public DistributedLogConfiguration setZKNumRetries(int zkNumRetries) {
+        setProperty(BKDL_ZK_NUM_RETRIES, zkNumRetries);
+        return this;
+    }
+
+    /**
+     * Get the start backoff time of zookeeper operation retries, in 
milliseconds.
+     * <p>The retry time will increase in bound exponential way, and become 
flat
+     * after hit max backoff time ({@link #getZKRetryBackoffMaxMillis()}).
+     * The default start backoff time is 5000 milliseconds.
+     *
+     * @return start backoff time of zookeeper operation retries, in 
milliseconds.
+     * @see #getZKRetryBackoffMaxMillis()
+     */
+    public int getZKRetryBackoffStartMillis() {
+        return this.getInt(BKDL_ZK_RETRY_BACKOFF_START_MILLIS,
+                           BKDL_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT);
+    }
+
+    /**
+     * Set the start backoff time of zookeeper operation retries, in 
milliseconds.
+     *
+     * @param zkRetryBackoffStartMillis start backoff time of zookeeper 
operation retries,
+     *                                  in milliseconds.
+     * @return distributed log configuration
+     * @see #getZKRetryBackoffStartMillis()
+     */
+    public DistributedLogConfiguration setZKRetryBackoffStartMillis(int 
zkRetryBackoffStartMillis) {
+        setProperty(BKDL_ZK_RETRY_BACKOFF_START_MILLIS, 
zkRetryBackoffStartMillis);
+        return this;
+    }
+
+    /**
+     * Get the max backoff time of zookeeper operation retries, in 
milliseconds.
+     * <p>The retry time will increase in bound exponential way starting from
+     * {@link #getZKRetryBackoffStartMillis()}, and become flat after hit this 
max
+     * backoff time.
+     * The default max backoff time is 30000 milliseconds.
+     *
+     * @return max backoff time of zookeeper operation retries, in 
milliseconds.
+     * @see #getZKRetryBackoffStartMillis()
+     */
+    public int getZKRetryBackoffMaxMillis() {
+        return this.getInt(BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS,
+                           BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT);
+    }
+
+    /**
+     * Set the max backoff time of zookeeper operation retries, in 
milliseconds.
+     *
+     * @param zkRetryBackoffMaxMillis max backoff time of zookeeper operation 
retries,
+     *                                in milliseconds.
+     * @return distributed log configuration
+     * @see #getZKRetryBackoffMaxMillis()
+     */
+    public DistributedLogConfiguration setZKRetryBackoffMaxMillis(int 
zkRetryBackoffMaxMillis) {
+        setProperty(BKDL_ZK_RETRY_BACKOFF_MAX_MILLIS, zkRetryBackoffMaxMillis);
+        return this;
+    }
+
+    /**
+     * Get ZK client number of retry executor threads.
+     * By default it is 1.
+     *
+     * @return number of bookkeeper client worker threads.
+     */
+    public int getZKClientNumberRetryThreads() {
+        return this.getInt(BKDL_ZKCLIENT_NUM_RETRY_THREADS, 
BKDL_ZKCLIENT_NUM_RETRY_THREADS_DEFAULT);
+    }
+
+    /**
+     * Set ZK client number of retry executor threads.
+     *
+     * @param numThreads
+     *          number of retry executor threads.
+     * @return distributedlog configuration.
+     * @see #getZKClientNumberRetryThreads()
+     */
+    public DistributedLogConfiguration setZKClientNumberRetryThreads(int 
numThreads) {
+        setProperty(BKDL_ZKCLIENT_NUM_RETRY_THREADS, numThreads);
+        return this;
+    }
+
+    //
+    // BookKeeper ZooKeeper Client Settings
+    //
+
+    /**
+     * Get BK's zookeeper session timout in milliseconds.
+     * <p>
+     * This is the session timeout applied for zookeeper client used by 
bookkeeper client.
+     * Use {@link #getZKSessionTimeoutMilliseconds()} for zookeeper client used
+     * by distributedlog.
+     *
+     * @return Bk's zookeeper session timeout in milliseconds
+     */
+    public int getBKClientZKSessionTimeoutMilliSeconds() {
+        return this.getInt(BKDL_BKCLIENT_ZK_SESSION_TIMEOUT, 
BKDL_BKCLIENT_ZK_SESSION_TIMEOUT_DEFAULT) * 1000;
+    }
+
+    /**
+     * Set BK's zookeeper session timeout in seconds.
+     *
+     * @param sessionTimeout session timeout for the ZK Client used by BK 
Client, in seconds.
+     * @return distributed log configuration
+     * @see #getBKClientZKSessionTimeoutMilliSeconds()
+     */
+    public DistributedLogConfiguration setBKClientZKSessionTimeout(int 
sessionTimeout) {
+        setProperty(BKDL_BKCLIENT_ZK_SESSION_TIMEOUT, sessionTimeout);
+        return this;
+    }
+
+    /**
+     * Get zookeeper access rate limit for zookeeper client used in bookkeeper 
client.
+     * <p>The rate limiter is basically a guava {@link 
com.google.common.util.concurrent.RateLimiter}.
+     * It is rate limiting the requests that sent by zookeeper client. If the 
value is non-positive,
+     * the rate limiting is disable. By default it is disable (value = 0).
+     *
+     * @return zookeeper access rate limit for zookeeper client used in 
bookkeeper client.
+     * By default it is 0.
+     */
+    public double getBKClientZKRequestRateLimit() {
+        return this.getDouble(BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT,
+                BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT_DEFAULT);
+    }
+
+    /**
+     * Set zookeeper access rate limit for zookeeper client used in bookkeeper 
client.
+     *
+     * @param rateLimit
+     *          zookeeper access rate limit
+     * @return distributedlog configuration.
+     * @see #getBKClientZKRequestRateLimit()
+     */
+    public DistributedLogConfiguration setBKClientZKRequestRateLimit(double 
rateLimit) {
+        setProperty(BKDL_BKCLIENT_ZK_REQUEST_RATE_LIMIT, rateLimit);
+        return this;
+    }
+
+    /**
+     * Get num of retries for zookeeper client that used by bookkeeper client.
+     * <p>Retries only happen on retryable failures like session expired,
+     * session moved. for permanent failures, the request will fail 
immediately.
+     * The default value is 3. Setting it to zero or negative will retry 
infinitely.
+     *
+     * @return num of retries of zookeeper client used by bookkeeper client.
+     */
+    public int getBKClientZKNumRetries() {
+        int zkNumRetries = this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, 
BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+        if (zkNumRetries <= 0) {
+            return Integer.MAX_VALUE;
+        }
+        return zkNumRetries;
+    }
+
+    /**
+     * Get the start backoff time of zookeeper operation retries, in 
milliseconds.
+     * <p>The retry time will increase in bound exponential way, and become 
flat
+     * after hit max backoff time ({@link 
#getBKClientZKRetryBackoffMaxMillis()}.
+     * The default start backoff time is 5000 milliseconds.
+     *
+     * @return start backoff time of zookeeper operation retries, in 
milliseconds.
+     * @see #getBKClientZKRetryBackoffMaxMillis()
+     */
+    public int getBKClientZKRetryBackoffStartMillis() {
+        return this.getInt(BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS,
+                           
BKDL_BKCLIENT_ZK_RETRY_BACKOFF_START_MILLIS_DEFAULT);
+    }
+
+    /**
+     * Get the max backoff time of zookeeper operation retries, in 
milliseconds.
+     * <p>The retry time will increase in bound exponential way starting from
+     * {@link #getBKClientZKRetryBackoffStartMillis()}, and become flat after
+     * hit this max backoff time.
+     * The default max backoff time is 30000 milliseconds.
+     *
+     * @return max backoff time of zookeeper operation retries, in 
milliseconds.
+     * @see #getBKClientZKRetryBackoffStartMillis()
+     */
+    public int getBKClientZKRetryBackoffMaxMillis() {
+        return this.getInt(BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS,
+                BKDL_BKCLIENT_ZK_RETRY_BACKOFF_MAX_MILLIS_DEFAULT);
+    }
+
+    //
+    // BookKeeper Ensemble Placement Settings
+    //
+
+    /**
+     * Get ensemble size of each log segment (ledger) will use.
+     * By default it is 3.
+     * <p>
+     * A log segment's data is stored in an ensemble of bookies in
+     * a stripping way. Each entry will be added in a <code>write-quorum</code>
+     * size of bookies. The add operation will complete once it receives
+     * responses from a <code>ack-quorum</code> size of bookies. The stripping
+     * is done in a round-robin way in bookkeeper.
+     * <p>
+     * For example, we configure the ensemble-size to 5, write-quorum-size to 
3,
+     * and ack-quorum-size to 2. The data will be stored in following 
stripping way.
+     * <pre>
+     * | entry id | bk1 | bk2 | bk3 | bk4 | bk5 |
+     * |     0    |  x  |  x  |  x  |     |     |
+     * |     1    |     |  x  |  x  |  x  |     |
+     * |     2    |     |     |  x  |  x  |  x  |
+     * |     3    |  x  |     |     |  x  |  x  |
+     * |     4    |  x  |  x  |     |     |  x  |
+     * |     5    |  x  |  x  |  x  |     |     |
+     * </pre>
+     * <p>
+     * We don't recommend stripping within a log segment to increase bandwidth.
+     * We'd recommend to strip by `partition` in higher level of distributedlog
+     * to increase performance. so typically the ensemble size will set to be
+     * the same value as write quorum size.
+     *
+     * @return ensemble size
+     * @see #getWriteQuorumSize()
+     * @see #getAckQuorumSize()
+     */
+    public int getEnsembleSize() {
+        return this.getInt(BKDL_BOOKKEEPER_ENSEMBLE_SIZE,
+                getInt(BKDL_BOOKKEEPER_ENSEMBLE_SIZE_OLD,
+                        BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT));
+    }
+
+    /**
+     * Set ensemble size of each log segment (ledger) will use.
+     *
+     * @param ensembleSize ensemble size.
+     * @return distributed log configuration
+     * @see #getEnsembleSize()
+     */
+    public DistributedLogConfiguration setEnsembleSize(int ensembleSize) {
+        setProperty(BKDL_BOOKKEEPER_ENSEMBLE_SIZE, ensembleSize);
+        return this;
+    }
+
+    /**
+     * Get write quorum size of each log segment (ledger) will use.
+     * By default it is 3.
+     *
+     * @return write quorum size
+     * @see #getEnsembleSize()
+     */
+    public int getWriteQuorumSize() {
+        return this.getInt(BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE,
+                getInt(BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_OLD,
+                        BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_DEFAULT));
+    }
+
+    /**
+     * Set write quorum size of each log segment (ledger) will use.
+     *
+     * @param quorumSize
+     *          quorum size.
+     * @return distributedlog configuration.
+     * @see #getWriteQuorumSize()
+     */
+    public DistributedLogConfiguration setWriteQuorumSize(int quorumSize) {
+        setProperty(BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE, quorumSize);
+        return this;
+    }
+
+    /**
+     * Get ack quorum size of each log segment (ledger) will use.
+     * By default it is 2.
+     *
+     * @return ack quorum size
+     * @see #getEnsembleSize()
+     */
+    public int getAckQuorumSize() {
+        return this.getInt(BKDL_BOOKKEEPER_ACK_QUORUM_SIZE,
+                getInt(BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_OLD,
+                        BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_DEFAULT));
+    }
+
+    /**
+     * Set ack quorum size of each log segment (ledger) will use.
+     *
+     * @param quorumSize
+     *          quorum size.
+     * @return distributedlog configuration.
+     * @see #getAckQuorumSize()
+     */
+    public DistributedLogConfiguration setAckQuorumSize(int quorumSize) {
+        setProperty(BKDL_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize);
+        return this;
+    }
+
+    /**
+     * Get the quorum config for each log segment (ledger).
+     *
+     * @return quorum config that used by log segments
+     * @see #getEnsembleSize()
+     * @see #getWriteQuorumSize()
+     * @see #getAckQuorumSize()
+     */
+    public QuorumConfig getQuorumConfig() {
+        return new QuorumConfig(
+                getEnsembleSize(),
+                getWriteQuorumSize(),
+                getAckQuorumSize());
+    }
+
+    /**
+     * Get if row aware ensemble placement is enabled.
+     * <p>If enabled, {@link DNSResolverForRows} will be used for dns 
resolution
+     * rather than {@link DNSResolverForRacks}, if no other dns resolver set 
via
+     * {@link #setEnsemblePlacementDnsResolverClass(Class)}.
+     * By default it is disable.
+     *
+     * @return true if row aware ensemble placement is enabled, otherwise 
false.
+     * @see #getEnsemblePlacementDnsResolverClass()
+     */
+    public boolean getRowAwareEnsemblePlacementEnabled() {
+        return getBoolean(BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT,
+                getBoolean(BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_OLD,
+                        BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT_DEFAULT));
+    }
+
+    /**
+     * Set if we should enable row aware ensemble placement.
+     *
+     * @param enableRowAwareEnsemblePlacement
+     *          enableRowAwareEnsemblePlacement
+     * @return distributedlog configuration.
+     * @see #getRowAwareEnsemblePlacementEnabled()
+     */
+    public DistributedLogConfiguration 
setRowAwareEnsemblePlacementEnabled(boolean enableRowAwareEnsemblePlacement) {
+        setProperty(BKDL_ROW_AWARE_ENSEMBLE_PLACEMENT, 
enableRowAwareEnsemblePlacement);
+        return this;
+    }
+
+    /**
+     * Get the DNS resolver class for bookkeeper ensemble placement.
+     * <p>By default, {@link DNSResolverForRacks} will be used if
+     * {@link #getRowAwareEnsemblePlacementEnabled()} is disabled and
+     * {@link DNSResolverForRows} will be used if {@link 
#getRowAwareEnsemblePlacementEnabled()}
+     * is enabled.
+     *
+     * @return dns resolver class for bookkeeper ensemble placement.
+     * @throws ConfigurationException
+     * @see #getRowAwareEnsemblePlacementEnabled()
+     */
+    public Class<? extends DNSToSwitchMapping> 
getEnsemblePlacementDnsResolverClass()
+            throws ConfigurationException {
+        Class<? extends DNSToSwitchMapping> defaultResolverCls;
+        if (getRowAwareEnsemblePlacementEnabled()) {
+            defaultResolverCls = DNSResolverForRows.class;
+        } else {
+            defaultResolverCls = DNSResolverForRacks.class;
+        }
+        return ReflectionUtils.getClass(this, 
BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS,
+                defaultResolverCls, DNSToSwitchMapping.class, defaultLoader);
+    }
+
+    /**
+     * Set the DNS resolver class for bookkeeper ensemble placement.
+     *
+     * @param dnsResolverClass
+     *          dns resolver class for bookkeeper ensemble placement.
+     * @return distributedlog configuration
+     * @see #getEnsemblePlacementDnsResolverClass()
+     */
+    public DistributedLogConfiguration setEnsemblePlacementDnsResolverClass(
+            Class<? extends DNSToSwitchMapping> dnsResolverClass) {
+        setProperty(BKDL_ENSEMBLE_PLACEMENT_DNS_RESOLVER_CLASS, 
dnsResolverClass.getName());
+        return this;
+    }
+
+    /**
+     * Get mapping used to override the region mapping derived by the default 
resolver.
+     * <p>It is a string of pairs of host-region mappings (host:region) 
separated by semicolon.
+     * By default it is empty string.
+     *
+     * @return dns resolver overrides.
+     * @see #getEnsemblePlacementDnsResolverClass()
+     * @see DNSResolverForRacks
+     * @see DNSResolverForRows
+     */
+    public String getBkDNSResolverOverrides() {
+        return getString(BKDL_BK_DNS_RESOLVER_OVERRIDES, 
BKDL_BK_DNS_RESOLVER_OVERRIDES_DEFAULT);
+    }
+
+    /**
+     * Set mapping used to override the region mapping derived by the default 
resolver
+     * <p>It is a string of pairs of host-region mappings (host:region) 
separated by semicolon.
+     * By default it is empty string.
+     *
+     * @param overrides
+     *          dns resolver overrides
+     * @return dl configuration.
+     * @see #getBkDNSResolverOverrides()
+     */
+    public DistributedLogConfiguration setBkDNSResolverOverrides(String 
overrides) {
+        setProperty(BKDL_BK_DNS_RESOLVER_OVERRIDES, overrides);
+        return this;
+    }
+
+    //
+    // BookKeeper General Settings
+    //
+
+    /**
+     * Set password used by bookkeeper client for digestion.
+     * <p>
+     * NOTE: not recommend to change. will be derepcated in future.
+     *
+     * @param bkDigestPW BK password digest
+     * @return distributedlog configuration
+     */
+    public DistributedLogConfiguration setBKDigestPW(String bkDigestPW) {
+        setProperty(BKDL_BOOKKEEPER_DIGEST_PW, bkDigestPW);
+        return this;
+    }
+
+    /**
+     * Get password used by bookkeeper client for digestion.
+     * <p>
+     * NOTE: not recommend to change. will be deprecated in future.
+     *
+     * @return password used by bookkeeper client for digestion
+     * @see #setBKDigestPW(String)
+     */
+    public String getBKDigestPW() {
+        return getString(BKDL_BOOKKEEPER_DIGEST_PW, 
BKDL_BOOKKEEPER_DIGEST_PW_DEFAULT);
+    }
+
+    /**
+     * Get BK client number of i/o threads used by Netty.
+     * The default value equals DL's number worker threads.
+     *
+     * @return number of bookkeeper netty i/o threads.
+     * @see #getNumWorkerThreads()
+     */
+    public int getBKClientNumberIOThreads() {
+        return this.getInt(BKDL_BKCLIENT_NUM_IO_THREADS, 
getNumWorkerThreads());
+    }
+
+    /**
+     * Set BK client number of i/o threads used by netty.
+     *
+     * @param numThreads
+     *          number io threads.
+     * @return distributedlog configuration.
+     * @see #getBKClientNumberIOThreads()
+     */
+    public DistributedLogConfiguration setBKClientNumberIOThreads(int 
numThreads) {
+        setProperty(BKDL_BKCLIENT_NUM_IO_THREADS, numThreads);
+        return this;
+    }
+
+    /**
+     * Get the tick duration in milliseconds that used for timeout timer in 
bookkeeper client.
+     * By default it is 100.
+     *
+     * @return tick duration in milliseconds
+     * @see org.jboss.netty.util.HashedWheelTimer
+     */
+    public long getTimeoutTimerTickDurationMs() {
+        return getLong(BKDL_TIMEOUT_TIMER_TICK_DURATION_MS, 
BKDL_TIMEOUT_TIMER_TICK_DURATION_MS_DEFAULT);
+    }
+
+    /**
+     * Set the tick duration in milliseconds that used for timeout timer in 
bookkeeper client.
+     *
+     * @param tickDuration
+     *          tick duration in milliseconds.
+     * @return distributed log configuration.
+     * @see #getTimeoutTimerTickDurationMs()
+     */
+    public DistributedLogConfiguration setTimeoutTimerTickDurationMs(long 
tickDuration) {
+        setProperty(BKDL_TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration);
+        return this;
+    }
+
+    /**
+     * Get number of ticks that used for timeout timer in bookkeeper client.
+     * By default is 1024.
+     *
+     * @return number of ticks that used for timeout timer.
+     * @see org.jboss.netty.util.HashedWheelTimer
+     */
+    public int getTimeoutTimerNumTicks() {
+        return getInt(BKDL_TIMEOUT_TIMER_NUM_TICKS, 
BKDL_TIMEOUT_TIMER_NUM_TICKS_DEFAULT);
+    }
+
+    /**
+     * Set number of ticks that used for timeout timer in bookkeeper client.
+     *
+     * @param numTicks
+     *          number of ticks that used for timeout timer.
+     * @return distributed log configuration.
+     * @see #getTimeoutTimerNumTicks()
+     */
+    public DistributedLogConfiguration setTimeoutTimerNumTicks(int numTicks) {
+        setProperty(BKDL_TIMEOUT_TIMER_NUM_TICKS, numTicks);
+        return this;
+    }
+
+    //
+    // Deprecated BookKeeper Settings
+    //
+
+    /**
+     * Get BK client read timeout in seconds.
+     * <p>
+     * Please use {@link ClientConfiguration#getReadEntryTimeout()}
+     * instead of this setting.
+     *
+     * @return read timeout in seconds
+     * @deprecated
+     * @see ClientConfiguration#getReadEntryTimeout()
+     */
+    public int getBKClientReadTimeout() {
+        return this.getInt(BKDL_BKCLIENT_READ_TIMEOUT,
+                BKDL_BKCLIENT_READ_TIMEOUT_DEFAULT);
+    }
+
+    /**
+     * Set BK client read timeout in seconds.
+     *
+     * @param readTimeout read timeout in seconds.
+     * @return distributed log configuration
+     * @deprecated
+     * @see #getBKClientReadTimeout()
+     */
+    public DistributedLogConfiguration setBKClientReadTimeout(int readTimeout) 
{
+        setProperty(BKDL_BKCLIENT_READ_TIMEOUT, readTimeout);
+        return this;
+    }
+
+    /**
+     * Get BK client write timeout in seconds.
+     * <p>
+     * Please use {@link ClientConfiguration#getAddEntryTimeout()}
+     * instead of this setting.
+     *
+     * @return write timeout in seconds.
+     * @deprecated
+     * @see ClientConfiguration#getAddEntryTimeout()
+     */
+    public int getBKClientWriteTimeout() {
+        return this.getInt(BKDL_BKCLIENT_WRITE_TIMEOUT, 
BKDL_BKCLIENT_WRITE_TIMEOUT_DEFAULT);
+    }
+
+    /**
+     * Set BK client write timeout in seconds
+     *
+     * @param writeTimeout write timeout in seconds.
+     * @return distributed log configuration
+     * @deprecated
+     * @see #getBKClientWriteTimeout()
+     */
+    public DistributedLogConfiguration setBKClientWriteTimeout(int 
writeTimeout) {
+        setProperty(BKDL_BKCLIENT_WRITE_TIMEOUT, writeTimeout);
+        return this;
+    }
+
+    /**
+     * Get BK client number of worker threads.
+     * <p>
+     * Please use {@link ClientConfiguration#getNumWorkerThreads()}
+     * instead of this setting.
+     *
+     * @return number of bookkeeper client worker threads.
+     * @deprecated
+     * @see ClientConfiguration#getNumWorkerThreads()
+     */
+    public int getBKClientNumberWorkerThreads() {
+        return this.getInt(BKDL_BKCLIENT_NUM_WORKER_THREADS, 
BKDL_BKCLEINT_NUM_WORKER_THREADS_DEFAULT);
+    }
+
+    /**
+     * Set BK client number of worker threads.
+     *
+     * @param numThreads
+     *          number worker threads.
+     * @return distributedlog configuration.
+     * @deprecated
+     * @see #getBKClientNumberWorkerThreads()
+     */
+    public DistributedLogConfiguration setBKClientNumberWorkerThreads(int 
numThreads) {
+        setProperty(BKDL_BKCLIENT_NUM_WORKER_THREADS, numThreads);
+        return this;
+    }
+
+    //
+    // DL Executor Settings
+    //
+
+    /**
+     * Get the number of worker threads used by distributedlog namespace.
+     * By default it is the number of available processors.
+     *
+     * @return number of worker threads used by distributedlog namespace.
+     */
+    public int getNumWorkerThreads() {
+        return getInt(BKDL_NUM_WORKER_THREADS, 
Runtime.getRuntime().availableProcessors());
+    }
+
+    /**
+     * Set the number of worker threads used by distributedlog namespace.
+     *
+     * @param numWorkerThreads
+     *          number of worker threads used by distributedlog namespace.
+     * @return configuration
+     * @see #getNumWorkerThreads()
+     */
+    public DistributedLogConfiguration setNumWorkerThreads(int 
numWorkerThreads) {
+        setProperty(BKDL_NUM_WORKER_THREADS, numWorkerThreads);
+        return this;
+    }
+
+    /**
+     * Get the number of dedicated readahead worker threads used by 
distributedlog namespace.
+     * <p>If this value is non-positive, it would share the normal executor 
(see {@link #getNumWorkerThreads()}
+     * for readahead. otherwise, it would use a dedicated executor for 
readhead. By default,
+     * it is 0.
+     *
+     * @return number of dedicated readahead worker threads.
+     * @see #getNumWorkerThreads()
+     */
+    @Deprecated
+    public int getNumReadAheadWorkerThreads() {
+        return getInt(BKDL_NUM_READAHEAD_WORKER_THREADS, 0);
+    }
+
+    /**
+     * Set the number of dedicated readahead worker threads used by 
distributedlog namespace.
+     *
+     * @param numWorkerThreads
+     *          number of dedicated readahead worker threads.
+     * @return configuration
+     * @see #getNumReadAheadWorkerThreads()
+     */
+    @Deprecated
+    public DistributedLogConfiguration setNumReadAheadWorkerThreads(int 
numWorkerThreads) {
+        setProperty(BKDL_NUM_READAHEAD_WORKER_THREADS, numWorkerThreads);
+        return this;
+    }
+
+    /**
+     * Get the number of lock state threads used by distributedlog namespace.
+     * By default it is 1.
+     *
+     * @return number of lock state threads used by distributedlog namespace.
+     */
+    public int getNumLockStateThreads() {
+        return getInt(BKDL_NUM_LOCKSTATE_THREADS, 1);
+    }
+
+    /**
+     * Set the number of lock state threads used by distributedlog manager 
factory.
+     *
+     * @param numLockStateThreads
+     *          number of lock state threads used by distributedlog manager 
factory.
+     * @return configuration
+     * @see #getNumLockStateThreads()
+     */
+    public DistributedLogConfiguration setNumLockStateThreads(int 
numLockStateThreads) {
+        setProperty(BKDL_NUM_LOCKSTATE_THREADS, numLockStateThreads);
+        return this;
+    }
+
+    /**
+     * Get the number of resource release threads used by distributedlog 
namespace.
+     * By default it is 0 - the thread will be created dynamically by a 
executor service.
+     * The executor service is an unbounded pool. Application can use 
`total_tasks - completed_tasks`
+     * on monitoring the number of threads that are used for releasing 
resources.
+     * <p>
+     * The setting is only applied for v2 implementation.
+     *
+     * @see org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor
+     * @return number of resource release threads used by distributedlog 
namespace.
+     */
+    public int getNumResourceReleaseThreads() {
+        return getInt(BKDL_NUM_RESOURCE_RELEASE_THREADS, 0);
+    }
+
+    /**
+     * Set the number of resource release threads used by distributedlog 
manager factory.
+     *
+     * @param numResourceReleaseThreads
+     *          number of resource release threads used by distributedlog 
manager factory.
+     * @return configuration
+     * @see #getNumResourceReleaseThreads()
+     */
+    public DistributedLogConfiguration setNumResourceReleaseThreads(int 
numResourceReleaseThreads) {
+        setProperty(BKDL_NUM_RESOURCE_RELEASE_THREADS, 
numResourceReleaseThreads);
+        return this;
+    }
+
+    /**
+     * Get timeout for shutting down schedulers in dl manager, in milliseconds.
+     * By default, it is 5 seconds.
+     *
+     * @return timeout for shutting down schedulers in dl manager, in 
miliseconds.
+     */
+    public int getSchedulerShutdownTimeoutMs() {
+        return getInt(BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS, 
BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS_DEFAULT);
+    }
+
+    /**
+     * Set timeout for shutting down schedulers in dl manager, in milliseconds.
+     *
+     * @param timeoutMs
+     *         timeout for shutting down schedulers in dl manager, in 
milliseconds.
+     * @return dl configuration.
+     * @see #getSchedulerShutdownTimeoutMs()
+     */
+    public DistributedLogConfiguration setSchedulerShutdownTimeoutMs(int 
timeoutMs) {
+        setProperty(BKDL_SCHEDULER_SHUTDOWN_TIMEOUT_MS, timeoutMs);
+        return this;
+    }
+
+    /**
+     * Whether to use daemon thread for DL threads.
+     * By default it is false.
+     *
+     * @return true if use daemon threads, otherwise false.
+     */
+    public boolean getUseDaemonThread() {
+        return getBoolean(BKDL_USE_DAEMON_THREAD, 
BKDL_USE_DAEMON_THREAD_DEFAULT);
+    }
+
+    /**
+     * Set whether to use daemon thread for DL threads.
+     *
+     * @param daemon
+     *          whether to use daemon thread for DL threads.
+     * @return distributedlog configuration
+     * @see #getUseDaemonThread()
+     */
+    public DistributedLogConfiguration setUseDaemonThread(boolean daemon) {
+        setProperty(BKDL_USE_DAEMON_THREAD, daemon);
+        return this;
+    }
+
+    //
+    // Metadata Settings
+    //
+
+    /**
+     * Get DL ledger metadata output layout version.
+     *
+     * @return layout version
+     * @see 
org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion
+     */
+    public int getDLLedgerMetadataLayoutVersion() {
+        return this.getInt(BKDL_LEDGER_METADATA_LAYOUT_VERSION,
+                getInt(BKDL_LEDGER_METADATA_LAYOUT_VERSION_OLD,
+                        BKDL_LEDGER_METADATA_LAYOUT_VERSION_DEFAULT));
+    }
+
+    /**
+     * Set DL ledger metadata output layout version.
+     *
+     * @param layoutVersion layout version
+     * @return distributed log configuration
+     * @throws IllegalArgumentException if setting an unknown layout version.
+     * @see #getDLLedgerMetadataLayoutVersion()
+     */
+    public DistributedLogConfiguration setDLLedgerMetadataLayoutVersion(int 
layoutVersion)
+            throws IllegalArgumentException {
+        if ((layoutVersion <= 0) ||
+            (layoutVersion > 
LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION)) {
+            // Incorrect version specified
+            throw new IllegalArgumentException("Incorrect value for ledger 
metadata layout version");
+        }
+        setProperty(BKDL_LEDGER_METADATA_LAYOUT_VERSION, layoutVersion);
+        return this;
+    }
+
+    /**
+     * Get the setting for whether we should enforce the min ledger metadata 
version check.
+     * By default it is false.
+     *
+     * @return whether we should enforce the min ledger metadata version check
+     * @see 
org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion
+     */
+    public boolean getDLLedgerMetadataSkipMinVersionCheck() {
+        return this.getBoolean(BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK,
+                BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK_DEFAULT);
+    }
+
+    /**
+     * Set if we should skip the enforcement of min ledger metadata version.
+     * <p>NOTE: please be aware the side effects of skipping min ledger 
metadata
+     * version checking.
+     *
+     * @param skipMinVersionCheck whether we should enforce the min ledger 
metadata version check
+     * @return distributed log configuration
+     * @see #getDLLedgerMetadataSkipMinVersionCheck()
+     */
+    public DistributedLogConfiguration 
setDLLedgerMetadataSkipMinVersionCheck(boolean skipMinVersionCheck) throws 
IllegalArgumentException {
+        setProperty(BKDL_LEDGER_METADATA_SKIP_MIN_VERSION_CHECK, 
skipMinVersionCheck);
+        return this;
+    }
+
+    /**
+     * Get the value at which ledger sequence number should start for streams 
that are being
+     * upgraded and did not have ledger sequence number to start with or for 
newly created
+     * streams. By default, it is 1.
+     * <p>In most of the cases this value should not be changed. It is useful 
for backfilling
+     * in the case of migrating log segments whose metadata don't have log 
segment sequence number.
+     *
+     * @return first ledger sequence number
+     */
+    public long getFirstLogSegmentSequenceNumber() {
+        return this.getLong(BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER,
+                getLong(BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_OLD,
+                        BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER_DEFAULT));
+    }
+
+    /**
+     * Set the value at which ledger sequence number should start for streams 
that are being
+     * upgraded and did not have ledger sequence number to start with or for 
newly created
+     * streams
+     *
+     * @param firstLogSegmentSequenceNumber first ledger sequence number
+     * @return distributed log configuration
+     * @see #getFirstLogSegmentSequenceNumber()
+     */
+    public DistributedLogConfiguration setFirstLogSegmentSequenceNumber(long 
firstLogSegmentSequenceNumber)
+            throws IllegalArgumentException {
+        if (firstLogSegmentSequenceNumber <= 0) {
+            // Incorrect ledger sequence number specified
+            throw new IllegalArgumentException("Incorrect value for ledger 
sequence number");
+        }
+        setProperty(BKDL_FIRST_LOGSEGMENT_SEQUENCE_NUMBER, 
firstLogSegmentSequenceNumber);
+        return this;
+    }
+
+    /**
+     * Whether log segment sequence number validation is enabled?
+     *
+     * @return true if the log segment sequence number validation is enabled, 
otherwise false.
+     */
+    public boolean isLogSegmentSequenceNumberValidationEnabled() {
+        return 
this.getBoolean(BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED,
+                BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED_DEFAULT);
+    }
+
+    /**
+     * Whether log segment sequence number validation is enabled?
+     *
+     * @return true if the log segment sequence number validation is enabled, 
otherwise false.
+     */
+    public DistributedLogConfiguration 
setLogSegmentSequenceNumberValidationEnabled(boolean enabled) {
+        setProperty(BKDL_LOGSEGMENT_SEQUENCE_NUMBER_VALIDATION_ENABLED, 
enabled);
+        return this;
+    }
+
+    /**
+     * Whether we should publish record counts in the log records and metadata.
+     * <p>By default it is true. This is a legacy setting for log segment 
version 1. It
+     * should be considered removed.
+     *
+     * @return if record counts should be persisted
+     */
+    public boolean getEnableRecordCounts() {
+        return getBoolean(BKDL_ENABLE_RECORD_COUNTS, 
BKDL_ENABLE_RECORD_COUNTS_DEFAULT);
+    }
+
+    /**
+     * Set if we should publish record counts in the log records and metadata.
+     *
+     * @param enableRecordCounts enable record counts
+     * @return distributed log configuration
+     * @see #getEnableRecordCounts()
+     */
+    public DistributedLogConfiguration setEnableRecordCounts(boolean 
enableRecordCounts) {
+        setProperty(BKDL_ENABLE_RECORD_COUNTS, enableRecordCounts);
+        return this;
+    }
+
+    /**
+     * Whether sanity check txn id on starting log segments.
+     * <p>If it is enabled, DL writer would throw
+     * {@link 
org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException}
+     * when it received a smaller transaction id than current maximum 
transaction id.
+     *
+     * @return true if should check txn id with max txn id, otherwise false.
+     */
+    @Deprecated
+    public boolean getSanityCheckTxnID() {
+        return getBoolean(BKDL_MAXID_SANITYCHECK, 
BKDL_MAXID_SANITYCHECK_DEFAULT);
+    }
+
+    /**
+     * Enable/Disable sanity check txn id.
+     *
+     * @param enabled
+     *          enable/disable sanity check txn id.
+     * @return configuration.
+     * @see #getSanityCheckTxnID()
+     */
+    @Deprecated
+    public DistributedLogConfiguration setSanityCheckTxnID(boolean enabled) {
+        setProperty(BKDL_MAXID_SANITYCHECK, enabled);
+        return this;
+    }
+
+    /**
+     * Whether encode region id in log segment metadata.
+     * <p>In global DL use case, encoding region id in log segment medata would
+     * help understanding what region that a log segment is created. The region
+     * id field in log segment metadata would help for moniotring and 
troubleshooting.
+     *
+     * @return whether to encode region id in log segment metadata.
+     */
+    public boolean getEncodeRegionIDInLogSegmentMetadata() {
+        return getBoolean(BKDL_ENCODE_REGION_ID_IN_VERSION, 
BKDL_ENCODE_REGION_ID_IN_VERSION_DEFAULT);
+    }
+
+    /**
+     * Enable/Disable encoding region id in log segment metadata.
+     *
+     * @param enabled
+     *          flag to enable/disable encoding region id in log segment 
metadata.
+     * @return configuration instance.
+     * @see #getEncodeRegionIDInLogSegmentMetadata()
+     */
+    public DistributedLogConfiguration 
setEncodeRegionIDInLogSegmentMetadata(boolean enabled) {
+        setProperty(BKDL_ENCODE_REGION_ID_IN_VERSION, enabled);
+        return this;
+    }
+
+    /**
+     * Get log segment name version.
+     * <p>
+     * <ul>
+     * <li>version 0: inprogress_(start_txid) |
+     * logrecs_(start_txid)_(end_txid)</li>
+     * <li>version 1: inprogress_(logsegment_sequence_number) |
+     * logrecs_(logsegment_sequence_number)</li>
+     * </ul>
+     * By default it is 1.
+     *
+     * @return log segment name verison.
+     */
+    public int getLogSegmentNameVersion() {
+        return getInt(BKDL_LOGSEGMENT_NAME_VERSION, 
BKDL_LOGSEGMENT_NAME_VERSION_DEFAULT);
+    }
+
+    /**
+     * Set log segment name version.
+     *
+     * @param version
+     *          log segment name version.
+     * @return configuration object.
+     * @see #getLogSegmentNameVersion()
+     */
+    public DistributedLogConfiguration setLogSegmentNameVersion(int version) {
+        setProperty(BKDL_LOGSEGMENT_NAME_VERSION, version);
+        return this;
+    }
+
+    /**
+     * Get name of the unpartitioned stream.
+     * <p>It is a legacy setting. consider removing it in future.
+     *
+     * @return unpartitioned stream
+     */
+    public String getUnpartitionedStreamName() {
+        return getString(BKDL_UNPARTITIONED_STREAM_NAME, 
BKDL_UNPARTITIONED_STREAM_NAME_DEFAULT);
+    }
+
+    /**
+     * Set name of the unpartitioned stream
+     *
+     * @param streamName name of the unpartitioned stream
+     * @return distributedlog configuration
+     * @see #getUnpartitionedStreamName()
+     */
+    public DistributedLogConfiguration setUnpartitionedStreamName(String 
streamName) {
+        setProperty(BKDL_UNPARTITIONED_STREAM_NAME, streamName);
+        return this;
+    }
+
+    //
+    // LogSegment Cache Settings
+    //
+
+    /**
+     * Get the log segment cache entry TTL in milliseconds.
+     *
+     * @return log segment cache ttl in milliseconds.
+     */
+    public long getLogSegmentCacheTTLMs() {
+        return getLong(BKDL_LOGSEGMENT_CACHE_TTL_MS, 
BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT);
+    }
+
+    /**
+     * Set the log segment cache entry TTL in milliseconds.
+     *
+     * @param ttlMs TTL in milliseconds
+     * @return distributedlog configuration
+     */
+    public DistributedLogConfiguration setLogSegmentCacheTTLMs(long ttlMs) {
+        setProperty(BKDL_LOGSEGMENT_CACHE_TTL_MS, ttlMs);
+        return this;
+    }
+
+    /**
+     * Get the maximum size of the log segment cache.
+     *
+     * @return maximum size of the log segment cache.
+     */
+    public long getLogSegmentCacheMaxSize() {
+        return getLong(BKDL_LOGSEGMENT_CACHE_MAX_SIZE, 
BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT);
+    }
+
+    /**
+     * Set the maximum size of the log segment cache.
+     *
+     * @param maxSize maximum size of the log segment cache.
+     * @return distributedlog configuration
+     */
+    public DistributedLogConfiguration setLogSegmentCacheMaxSize(long maxSize) 
{
+        setProperty(BKDL_LOGSEGMENT_CACHE_MAX_SIZE, maxSize);
+        return this;
+    }
+
+    /**
+     * Is log segment cache enabled?
+     *
+     * @return true if log segment cache is enabled; otherwise false
+     */
+    public boolean isLogSegmentCacheEnabled() {
+        return getBoolean(BKDL_LOGSEGMENT_CACHE_ENABLED, 
BKDL_LOGSEGMENT_CACHE_ENABLED_DEFAULT);
+    }
+
+    /**
+     * Enable/disable log segment cache.
+     *
+     * @return distributedlog configuration
+     */
+    public DistributedLogConfiguration setLogSegmentCacheEnabled(boolean 
enabled) {
+        setProperty(BKDL_LOGSEGMENT_CACHE_ENABLED, enabled);
+        return this;
+    }
+
+    //
+    // DL Writer General Settings
+    //
+
+    /**
+     * Whether to create stream if not exists. By default it is true.
+     *
+     * @return true if it is abled to create stream if not exists.
+     */
+    public boolean getCreateStreamIfNotExists() {
+        return getBoolean(BKDL_CREATE_STREAM_IF_NOT_EXISTS,
+                BKDL_CREATE_STREAM_IF_NOT_EXISTS_DEFAULT);
+    }
+
+    /**
+     * Enable/Disable creating stream if not exists.
+     *
+     * @param enabled
+     *          enable/disable sanity check txn id.
+     * @return distributed log configuration.
+     * @see #getCreateStreamIfNotExists()
+     */
+    public DistributedLogConfiguration setCreateStreamIfNotExists(boolean 
enabled) {
+        setProperty(BKDL_CREATE_STREAM_IF_NOT_EXISTS, enabled);
+        return this;
+    }
+
+    /**
+     * Get Log Flush timeout in seconds.
+     * <p>This is a setting used by DL writer on flushing data. It is 
typically used
+     * by synchronous writer and log segment writer. By default it is 30 
seconds.
+     *
+     * @return log flush timeout in seconds.
+     */
+    // @Deprecated
+    public int getLogFlushTimeoutSeconds() {
+        return this.getInt(BKDL_LOG_FLUSH_TIMEOUT, 
BKDL_LOG_FLUSH_TIMEOUT_DEFAULT);
+    }
+
+    /**
+     * Set Log Flush Timeout in seconds.
+     *
+     * @param logFlushTimeoutSeconds log flush timeout.
+     * @return distributed log configuration
+     * @see #getLogFlushTimeoutSeconds()
+     */
+    public DistributedLogConfiguration setLogFlushTimeoutSeconds(int 
logFlushTimeoutSeconds) {
+        setProperty(BKDL_LOG_FLUSH_TIMEOUT, logFlushTimeoutSeconds);
+        return this;
+    }
+
+    /**
+     * The compression type to use while sending data to bookkeeper.
+     *
+     * @return compression type to use
+     * @see org.apache.distributedlog.io.CompressionCodec
+     */
+    public String getCompressionType() {
+        return getString(BKDL_COMPRESSION_TYPE, BKDL_COMPRESSION_TYPE_DEFAULT);
+    }
+
+    /**
+     * Set the compression type to use while sending data to bookkeeper.
+     *
+     * @param compressionType compression type
+     * @return distributedlog configuration
+     * @see #getCompressionType()
+     */
+    public DistributedLogConfiguration setCompressionType(String 
compressionType) {
+        Preconditions.checkArgument(null != compressionType && 
!compressionType.isEmpty());
+        setProperty(BKDL_COMPRESSION_TYPE, compressionType);
+        return this;
+    }
+
+    /**
+     * Whether to fail immediately if the stream is not ready rather than 
queueing the request.
+     * <p>If it is enabled, it would fail the write request immediately if the 
stream isn't ready.
+     * Consider turning it on for the use cases that could retry writing to 
other streams
+     * (aka non-strict ordering guarantee). It would result fast failure hence 
the client would
+     * retry immediately.
+     *
+     * @return true if should fail fast. otherwise, false.
+     */
+    public boolean getFailFastOnStreamNotReady() {
+        return getBoolean(BKDL_FAILFAST_ON_STREAM_NOT_READY,
+                BKDL_FAILFAST_ON_STREAM_NOT_READY_DEFAULT);
+    }
+
+    /**
+     * Set the failfast on stream not ready flag.
+     *
+     * @param failFastOnStreamNotReady
+     *        set failfast flag
+     * @return dl configuration.
+     * @see #getFailFastOnStreamNotReady()
+     */
+    public DistributedLogConfiguration setFailFastOnStreamNotReady(boolean 
failFastOnStreamNotReady) {
+        setProperty(BKDL_FAILFAST_ON_STREAM_NOT_READY, 
failFastOnStreamNotReady);
+        return this;
+    }
+
+    /**
+     * If this option is set, the log writer won't reset the segment writer if 
an error
+     * is encountered.
+     *
+     * @return true if we should disable automatic rolling
+     */
+    public boolean getDisableRollingOnLogSegmentError() {
+        return getBoolean(BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR,
+                BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR_DEFAULT);
+    }
+
+    /**
+     * Set the roll on segment error flag.
+     *
+     * @param disableRollingOnLogSegmentError
+     *        set roll on error flag
+     * @return dl configuration.
+     * @see #getDisableRollingOnLogSegmentError()
+     */
+    public DistributedLogConfiguration 
setDisableRollingOnLogSegmentError(boolean disableRollingOnLogSegmentError) {
+        setProperty(BKDL_DISABLE_ROLLING_ON_LOG_SEGMENT_ERROR, 
disableRollingOnLogSegmentError);
+        return this;
+    }
+
+    //
+    // DL Durability Settings
+    //
+
+    /**
+     * Check whether the durable write is enabled.
+     * <p>It is enabled by default.
+     *
+     * @return true if durable write is enabled. otherwise, false.
+     */
+    public boolean isDurableWriteEnabled() {
+        return this.getBoolean(BKDL_IS_DURABLE_WRITE_ENABLED, 
BKDL_IS_DURABLE_WRITE_ENABLED_DEFAULT);
+    }
+
+    /**
+     * Enable/Disable durable writes in writers.
+     *
+     * @param enabled
+     *          flag to enable/disable durable writes in writers.
+     * @return distributedlog configuration
+     */
+    public DistributedLogConfiguration setDurableWriteEnabled(boolean enabled) 
{
+        setProperty(BKDL_IS_DURABLE_WRITE_ENABLED, enabled);
+        return this;
+    }
+
+    //
+    // DL Writer Transmit Settings
+    //
+
+    /**
+     * Get output buffer size for DL writers, in bytes.
+     * <p>Large buffer will result in higher compression ratio and
+     * it would use the bandwidth more efficiently and improve throughput.
+     * Set it to 0 would ask DL writers to transmit the data immediately,
+     * which it could achieve low latency.
+     * <p>The default value is 1KB.
+     *
+     * @return buffer size in byes.
+     */
+    public int getOutputBufferSize() {
+        return this.getInt(BKDL_OUTPUT_BUFFER_SIZE,
+                getInt(BKDL_OUTPUT_BUFFER_SIZE_OLD, 
BKDL_OUTPUT_BUFFER_SIZE_DEFAULT));
+    }
+
+    /**
+     * Set output buffer size for DL writers, in bytes.
+     *
+     * @param opBufferSize output buffer size.
+     * @return distributed log configuration
+     * @see #getOutputBufferSize()
+     */
+    public DistributedLogConfiguration setOutputBufferSize(int opBufferSize) {
+        setProperty(BKDL_OUTPUT_BUFFER_SIZE, opBufferSize);
+        return this;
+    }
+
+    /**
+     * Get Periodic Log Flush Frequency in milliseconds.
+     * <p>If the setting is set with a positive value, the data in output 
buffer
+     * will be flushed in this provided interval. The default value is 0.
+     *
+     * @return periodic flush frequency in milliseconds.
+     * @see #getOutputBufferSize()
+     */
+    public int getPeriodicFlushFrequencyMilliSeconds() {
+        return this.getInt(BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS,
+                BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS_DEFAULT);
+    }
+
+    /**
+     * Set Periodic Log Flush Frequency in milliseconds.
+     *
+     * @param flushFrequencyMs periodic flush frequency in milliseconds.
+     * @return distributed log configuration
+     * @see #getPeriodicFlushFrequencyMilliSeconds()
+     */
+    public DistributedLogConfiguration 
setPeriodicFlushFrequencyMilliSeconds(int flushFrequencyMs) {
+        setProperty(BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS, 
flushFrequencyMs);
+        return this;
+    }
+
+    /**
+     * Is immediate flush enabled.
+     * <p>If it is enabled, it would flush control record immediately after 
adding
+     * data completed. The default value is false.
+     *
+     * @return whether immediate flush is enabled
+     */
+    public boolean getImmediateFlushEnabled() {
+        return getBoolean(BKDL_ENABLE_IMMEDIATE_FLUSH, 
BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT);
+    }
+
+    /**
+     * Enable/Disable immediate flush
+     *
+     * @param enabled
+     *          flag to enable/disable immediate flush.
+     * @return configuration instance.
+     * @see #getImmediateFlushEnabled()
+     */
+    public DistributedLogConfiguration setImmediateFlushEnabled(boolean 
enabled) {
+        setProperty(BKDL_ENABLE_IMMEDIATE_FLUSH, enabled);
+        return this;
+    }
+
+    /**
+     * Get minimum delay between immediate flushes in milliseconds.
+     * <p>This setting only takes effects when {@link 
#getImmediateFlushEnabled()}
+     * is enabled. It torelants the bursty of traffic when immediate flush is 
enabled,
+     * which prevents sending too many control records to the bookkeeper.
+     *
+     * @return minimum delay between immediate flushes in milliseconds
+     * @see #getImmediateFlushEnabled()
+     */
+    public int getMinDelayBetweenImmediateFlushMs() {
+        return 
this.getInt(BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS, 
BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT);
+    }
+
+    /**
+     * Set minimum delay between immediate flushes in milliseconds
+     *
+     * @param minDelayMs minimum delay between immediate flushes in 
milliseconds.
+     * @return distributed log configuration
+     * @see #getMinDelayBetweenImmediateFlushMs()
+     */
+    public DistributedLogConfiguration setMinDelayBetweenImmediateFlushMs(int 
minDelayMs) {
+        setProperty(BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS, 
minDelayMs);
+        return this;
+    }
+
+    /**
+     * Get Periodic Keep Alive Frequency in milliseconds.
+     * <p>If the setting is set with a positive value, it would periodically 
write a control record
+     * to keep the stream active. The default value is 0.
+     *
+     * @return periodic keep alive frequency in milliseconds.
+     */
+    public int getPeriodicKeepAliveMilliSeconds() {
+        return this.getInt(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, 
BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT);
+    }
+
+    /**
+     * Set Periodic Keep Alive Frequency in milliseconds.
+     *
+     * @param keepAliveMs keep alive frequency in milliseconds.
+     * @return distributedlog configuration
+     * @see #getPeriodicKeepAliveMilliSeconds()
+     */
+    public DistributedLogConfiguration setPeriodicKeepAliveMilliSeconds(int 
keepAliveMs) {
+        setProperty(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, keepAliveMs);
+        return this;
+    }
+
+    //
+    // DL Retention/Truncation Settings
+    //
+
+    /**
+     * Get log segment retention period in hours.
+     * The default value is 3 days.
+     *
+     * @return log segment retention period in hours
+     */
+    public int getRetentionPeriodHours() {
+        return this.getInt(BKDL_RETENTION_PERIOD_IN_HOURS,
+                getInt(BKDL_RETENTION_PERIOD_IN_HOURS_OLD,
+                        BKDL_RETENTION_PERIOD_IN_HOURS_DEFAULT));
+    }
+
+    /**
+     * Set log segment retention period in hours.
+     *
+     * @param retentionHours retention period in hours.
+     * @return distributed log configuration
+     */
+    public DistributedLogConfiguration setRetentionPeriodHours(int 
retentionHours) {
+        setProperty(BKDL_RETENTION_PERIOD_IN_HOURS, retentionHours);
+        return this;
+    }
+
+    /**
+     * Is truncation managed explicitly by the application.
+     * <p>If this is set then time based retention is only a hint to perform
+     * deferred cleanup. However we never remove a segment that has not been
+     * already marked truncated.
+     * <p>It is disabled by default.
+     *
+     * @return whether truncation managed explicitly by the application
+     * @see org.apache.distributedlog.LogSegmentMetadata.TruncationStatus
+     */
+    public boolean getExplicitTruncationByApplication() {
+        return getBoolean(BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION,
+                BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION_DEFAULT);
+    }
+
+    /**
+     * Enable/Disable whether truncation is managed explicitly by the 
application.
+     *
+     * @param enabled
+     *          flag to enable/disable whether truncation is managed 
explicitly by the application.
+     * @return configuration instance.
+     */
+    public DistributedLogConfiguration 
setExplicitTruncationByApplication(boolean enabled) {
+        setProperty(BKDL_EXPLICIT_TRUNCATION_BY_APPLICATION, enabled);
+        return this;
+    }
+
+    //
+    // Log Segment Rolling Settings
+    //
+
+    /**
+     * Get log segment rolling interval in minutes.
+     * <p>If the setting is set to a positive value, DL writer will roll log 
segments
+     * based on time. Otherwise, it will roll log segments based on size.
+     * <p>The default value is 2 hours.
+     *
+     * @return log segment rolling interval in minutes
+     * @see #getMaxLogS

<TRUNCATED>

Reply via email to