http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
deleted file mode 100644
index 8d3c418..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Optional;
-import com.twitter.distributedlog.ZooKeeperClient.Credentials;
-import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials;
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.net.NetUtils;
-import com.twitter.distributedlog.util.ConfUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.net.DNSToSwitchMapping;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * BookKeeper Client wrapper over {@link BookKeeper}.
- *
- * <h3>Metrics</h3>
- * <ul>
- * <li> bookkeeper operation stats are exposed under current scope by {@link 
BookKeeper}
- * </ul>
- */
-public class BookKeeperClient {
-    static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class);
-
-    // Parameters to build bookkeeper client
-    private final DistributedLogConfiguration conf;
-    private final String name;
-    private final String zkServers;
-    private final String ledgersPath;
-    private final byte[] passwd;
-    private final ClientSocketChannelFactory channelFactory;
-    private final HashedWheelTimer requestTimer;
-    private final StatsLogger statsLogger;
-
-    // bookkeeper client state
-    private boolean closed = false;
-    private BookKeeper bkc = null;
-    private ZooKeeperClient zkc;
-    private final boolean ownZK;
-    // feature provider
-    private final Optional<FeatureProvider> featureProvider;
-
-    @SuppressWarnings("deprecation")
-    private synchronized void commonInitialization(
-            DistributedLogConfiguration conf, String ledgersPath,
-            ClientSocketChannelFactory channelFactory, StatsLogger 
statsLogger, HashedWheelTimer requestTimer)
-        throws IOException, InterruptedException, KeeperException {
-        ClientConfiguration bkConfig = new ClientConfiguration();
-        bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
-        bkConfig.setReadTimeout(conf.getBKClientReadTimeout());
-        bkConfig.setZkLedgersRootPath(ledgersPath);
-        bkConfig.setZkTimeout(conf.getBKClientZKSessionTimeoutMilliSeconds());
-        bkConfig.setNumWorkerThreads(conf.getBKClientNumberWorkerThreads());
-        
bkConfig.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class);
-        bkConfig.setZkRequestRateLimit(conf.getBKClientZKRequestRateLimit());
-        
bkConfig.setProperty(RegionAwareEnsemblePlacementPolicy.REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME,
-                
DistributedLogConstants.DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME);
-        // reload configuration from dl configuration with settings prefixed 
with 'bkc.'
-        ConfUtils.loadConfiguration(bkConfig, conf, "bkc.");
-
-        Class<? extends DNSToSwitchMapping> dnsResolverCls;
-        try {
-            dnsResolverCls = conf.getEnsemblePlacementDnsResolverClass();
-        } catch (ConfigurationException e) {
-            LOG.error("Failed to load bk dns resolver : ", e);
-            throw new IOException("Failed to load bk dns resolver : ", e);
-        }
-        final DNSToSwitchMapping dnsResolver =
-                NetUtils.getDNSResolver(dnsResolverCls, 
conf.getBkDNSResolverOverrides());
-
-        this.bkc = BookKeeper.newBuilder()
-            .config(bkConfig)
-            .zk(zkc.get())
-            .channelFactory(channelFactory)
-            .statsLogger(statsLogger)
-            .dnsResolver(dnsResolver)
-            .requestTimer(requestTimer)
-            .featureProvider(featureProvider.orNull())
-            .build();
-    }
-
-    BookKeeperClient(DistributedLogConfiguration conf,
-                     String name,
-                     String zkServers,
-                     ZooKeeperClient zkc,
-                     String ledgersPath,
-                     ClientSocketChannelFactory channelFactory,
-                     HashedWheelTimer requestTimer,
-                     StatsLogger statsLogger,
-                     Optional<FeatureProvider> featureProvider) {
-        this.conf = conf;
-        this.name = name;
-        this.zkServers = zkServers;
-        this.ledgersPath = ledgersPath;
-        this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
-        this.channelFactory = channelFactory;
-        this.requestTimer = requestTimer;
-        this.statsLogger = statsLogger;
-        this.featureProvider = featureProvider;
-        this.ownZK = null == zkc;
-        if (null != zkc) {
-            // reference the passing zookeeper client
-            this.zkc = zkc;
-        }
-    }
-
-    private synchronized void initialize() throws IOException {
-        if (null != this.bkc) {
-            return;
-        }
-        if (null == this.zkc) {
-            int zkSessionTimeout = 
conf.getBKClientZKSessionTimeoutMilliSeconds();
-            RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
-                        conf.getBKClientZKRetryBackoffStartMillis(),
-                        conf.getBKClientZKRetryBackoffMaxMillis(), 
conf.getBKClientZKNumRetries());
-            Credentials credentials = Credentials.NONE;
-            if (conf.getZkAclId() != null) {
-                credentials = new DigestCredentials(conf.getZkAclId(), 
conf.getZkAclId());
-            }
-
-            this.zkc = new ZooKeeperClient(name + ":zk", zkSessionTimeout, 2 * 
zkSessionTimeout, zkServers,
-                                           retryPolicy, 
statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(),
-                                           
conf.getBKClientZKRequestRateLimit(), credentials);
-        }
-
-        try {
-            commonInitialization(conf, ledgersPath, channelFactory, 
statsLogger, requestTimer);
-        } catch (InterruptedException e) {
-            throw new DLInterruptedException("Interrupted on creating 
bookkeeper client " + name + " : ", e);
-        } catch (KeeperException e) {
-            throw new ZKException("Error on creating bookkeeper client " + 
name + " : ", e);
-        }
-
-        if (ownZK) {
-            LOG.info("BookKeeper Client created {} with its own ZK Client : 
ledgersPath = {}, numRetries = {}, " +
-                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, 
dnsResolver = {}",
-                    new Object[] { name, ledgersPath,
-                    conf.getBKClientZKNumRetries(), 
conf.getBKClientZKSessionTimeoutMilliSeconds(),
-                    conf.getBKClientZKRetryBackoffStartMillis(), 
conf.getBKClientZKRetryBackoffMaxMillis(),
-                    conf.getBkDNSResolverOverrides() });
-        } else {
-            LOG.info("BookKeeper Client created {} with shared zookeeper 
client : ledgersPath = {}, numRetries = {}, " +
-                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, 
dnsResolver = {}",
-                    new Object[] { name, ledgersPath,
-                    conf.getZKNumRetries(), 
conf.getZKSessionTimeoutMilliseconds(),
-                    conf.getZKRetryBackoffStartMillis(), 
conf.getZKRetryBackoffMaxMillis(),
-                    conf.getBkDNSResolverOverrides() });
-        }
-    }
-
-
-    public synchronized BookKeeper get() throws IOException {
-        checkClosedOrInError();
-        if (null == bkc) {
-            initialize();
-        }
-        return bkc;
-    }
-
-    // Util functions
-    public Future<LedgerHandle> createLedger(int ensembleSize,
-                                             int writeQuorumSize,
-                                             int ackQuorumSize) {
-        BookKeeper bk;
-        try {
-            bk = get();
-        } catch (IOException ioe) {
-            return Future.exception(ioe);
-        }
-        final Promise<LedgerHandle> promise = new Promise<LedgerHandle>();
-        bk.asyncCreateLedger(ensembleSize, writeQuorumSize, ackQuorumSize,
-                BookKeeper.DigestType.CRC32, passwd, new 
AsyncCallback.CreateCallback() {
-                    @Override
-                    public void createComplete(int rc, LedgerHandle lh, Object 
ctx) {
-                        if (BKException.Code.OK == rc) {
-                            promise.updateIfEmpty(new 
Return<LedgerHandle>(lh));
-                        } else {
-                            promise.updateIfEmpty(new 
Throw<LedgerHandle>(BKException.create(rc)));
-                        }
-                    }
-                }, null);
-        return promise;
-    }
-
-    public Future<Void> deleteLedger(long lid,
-                                     final boolean ignoreNonExistentLedger) {
-        BookKeeper bk;
-        try {
-            bk = get();
-        } catch (IOException ioe) {
-            return Future.exception(ioe);
-        }
-        final Promise<Void> promise = new Promise<Void>();
-        bk.asyncDeleteLedger(lid, new AsyncCallback.DeleteCallback() {
-            @Override
-            public void deleteComplete(int rc, Object ctx) {
-                if (BKException.Code.OK == rc) {
-                    promise.updateIfEmpty(new Return<Void>(null));
-                } else if (BKException.Code.NoSuchLedgerExistsException == rc) 
{
-                    if (ignoreNonExistentLedger) {
-                        promise.updateIfEmpty(new Return<Void>(null));
-                    } else {
-                        promise.updateIfEmpty(new 
Throw<Void>(BKException.create(rc)));
-                    }
-                } else {
-                    promise.updateIfEmpty(new 
Throw<Void>(BKException.create(rc)));
-                }
-            }
-        }, null);
-        return promise;
-    }
-
-    public void close() {
-        BookKeeper bkcToClose;
-        ZooKeeperClient zkcToClose;
-        synchronized (this) {
-            if (closed) {
-                return;
-            }
-            closed = true;
-            bkcToClose = bkc;
-            zkcToClose = zkc;
-        }
-
-        LOG.info("BookKeeper Client closed {}", name);
-        if (null != bkcToClose) {
-            try {
-                bkcToClose.close();
-            } catch (InterruptedException e) {
-                LOG.warn("Interrupted on closing bookkeeper client {} : ", 
name, e);
-                Thread.currentThread().interrupt();
-            } catch (BKException e) {
-                LOG.warn("Error on closing bookkeeper client {} : ", name, e);
-            }
-        }
-        if (null != zkcToClose) {
-            if (ownZK) {
-                zkcToClose.close();
-            }
-        }
-    }
-
-    public synchronized void checkClosedOrInError() throws 
AlreadyClosedException {
-        if (closed) {
-            LOG.error("BookKeeper Client {} is already closed", name);
-            throw new AlreadyClosedException("BookKeeper Client " + name + " 
is already closed");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClientBuilder.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClientBuilder.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClientBuilder.java
deleted file mode 100644
index cad1096..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClientBuilder.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.apache.bookkeeper.feature.FeatureProvider;
-
-import org.apache.bookkeeper.feature.Feature;
-
-/**
- * Builder to build bookkeeper client.
- */
-public class BookKeeperClientBuilder {
-
-    /**
-     * Create a bookkeeper client builder to build bookkeeper clients.
-     *
-     * @return bookkeeper client builder.
-     */
-    public static BookKeeperClientBuilder newBuilder() {
-        return new BookKeeperClientBuilder();
-    }
-
-    // client name
-    private String name = null;
-    // dl config
-    private DistributedLogConfiguration dlConfig = null;
-    // bookkeeper settings
-    // zookeeper client
-    private ZooKeeperClient zkc = null;
-    // or zookeeper servers
-    private String zkServers = null;
-    // ledgers path
-    private String ledgersPath = null;
-    // statsLogger
-    private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-    // client channel factory
-    private ClientSocketChannelFactory channelFactory = null;
-    // request timer
-    private HashedWheelTimer requestTimer = null;
-    // feature provider
-    private Optional<FeatureProvider> featureProvider = Optional.absent();
-
-    // Cached BookKeeper Client
-    private BookKeeperClient cachedClient = null;
-
-    /**
-     * Private bookkeeper builder.
-     */
-    private BookKeeperClientBuilder() {}
-
-    /**
-     * Set client name.
-     *
-     * @param name
-     *          client name.
-     * @return builder
-     */
-    public synchronized BookKeeperClientBuilder name(String name) {
-        this.name = name;
-        return this;
-    }
-
-    /**
-     * <i>dlConfig</i> used to configure bookkeeper client.
-     *
-     * @param dlConfig
-     *          distributedlog config.
-     * @return builder.
-     */
-    public synchronized BookKeeperClientBuilder 
dlConfig(DistributedLogConfiguration dlConfig) {
-        this.dlConfig = dlConfig;
-        return this;
-    }
-
-    /**
-     * Set the zkc used to build bookkeeper client. If a zookeeper client is 
provided in this
-     * method, bookkeeper client will use it rather than creating a brand new 
one.
-     *
-     * @param zkc
-     *          zookeeper client.
-     * @return builder
-     * @see #zkServers(String)
-     */
-    public synchronized BookKeeperClientBuilder zkc(ZooKeeperClient zkc) {
-        this.zkc = zkc;
-        return this;
-    }
-
-    /**
-     * Set the zookeeper servers that bookkeeper client would connect to. If 
no zookeeper client
-     * is provided by {@link #zkc(ZooKeeperClient)}, bookkeeper client will 
use the given string
-     * to create a brand new zookeeper client.
-     *
-     * @param zkServers
-     *          zookeeper servers that bookkeeper client would connect to.
-     * @return builder
-     * @see #zkc(ZooKeeperClient)
-     */
-    public synchronized BookKeeperClientBuilder zkServers(String zkServers) {
-        this.zkServers = zkServers;
-        return this;
-    }
-
-    /**
-     * Set the ledgers path that bookkeeper client is going to access.
-     *
-     * @param ledgersPath
-     *          ledgers path
-     * @return builder
-     * @see 
org.apache.bookkeeper.conf.ClientConfiguration#getZkLedgersRootPath()
-     */
-    public synchronized BookKeeperClientBuilder ledgersPath(String 
ledgersPath) {
-        this.ledgersPath = ledgersPath;
-        return this;
-    }
-
-    /**
-     * Build BookKeeper client using existing <i>bkc</i> client.
-     *
-     * @param bkc
-     *          bookkeeper client.
-     * @return builder
-     */
-    public synchronized BookKeeperClientBuilder bkc(BookKeeperClient bkc) {
-        this.cachedClient = bkc;
-        return this;
-    }
-
-    /**
-     * Build BookKeeper client using existing <i>channelFactory</i>.
-     *
-     * @param channelFactory
-     *          Channel Factory used to build bookkeeper client.
-     * @return bookkeeper client builder.
-     */
-    public synchronized BookKeeperClientBuilder 
channelFactory(ClientSocketChannelFactory channelFactory) {
-        this.channelFactory = channelFactory;
-        return this;
-    }
-
-    /**
-     * Build BookKeeper client using existing <i>request timer</i>.
-     *
-     * @param requestTimer
-     *          HashedWheelTimer used to build bookkeeper client.
-     * @return bookkeeper client builder.
-     */
-    public synchronized BookKeeperClientBuilder requestTimer(HashedWheelTimer 
requestTimer) {
-        this.requestTimer = requestTimer;
-        return this;
-    }
-
-    /**
-     * Build BookKeeper Client using given stats logger <i>statsLogger</i>.
-     *
-     * @param statsLogger
-     *          stats logger to report stats
-     * @return builder.
-     */
-    public synchronized BookKeeperClientBuilder statsLogger(StatsLogger 
statsLogger) {
-        this.statsLogger = statsLogger;
-        return this;
-    }
-
-    public synchronized BookKeeperClientBuilder 
featureProvider(Optional<FeatureProvider> featureProvider) {
-        this.featureProvider = featureProvider;
-        return this;
-    }
-
-    private void validateParameters() {
-        Preconditions.checkNotNull(name, "Missing client name.");
-        Preconditions.checkNotNull(dlConfig, "Missing DistributedLog 
Configuration.");
-        Preconditions.checkArgument(null == zkc || null == zkServers, "Missing 
zookeeper setting.");
-        Preconditions.checkNotNull(ledgersPath, "Missing Ledgers Root Path.");
-    }
-
-    public synchronized BookKeeperClient build() {
-        if (null == cachedClient) {
-            cachedClient = buildClient();
-        }
-        return cachedClient;
-    }
-
-    private BookKeeperClient buildClient() {
-        validateParameters();
-        return new BookKeeperClient(dlConfig, name, zkServers, zkc, 
ledgersPath, channelFactory, requestTimer, statsLogger, featureProvider);
-    }
-}

Reply via email to