http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java deleted file mode 100644 index 8d3c418..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java +++ /dev/null @@ -1,289 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Optional; -import com.twitter.distributedlog.ZooKeeperClient.Credentials; -import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials; -import com.twitter.distributedlog.exceptions.AlreadyClosedException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.net.NetUtils; -import com.twitter.distributedlog.util.ConfUtils; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy; -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.net.DNSToSwitchMapping; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.RetryPolicy; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.zookeeper.KeeperException; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * BookKeeper Client wrapper over {@link BookKeeper}. - * - * <h3>Metrics</h3> - * <ul> - * <li> bookkeeper operation stats are exposed under current scope by {@link BookKeeper} - * </ul> - */ -public class BookKeeperClient { - static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class); - - // Parameters to build bookkeeper client - private final DistributedLogConfiguration conf; - private final String name; - private final String zkServers; - private final String ledgersPath; - private final byte[] passwd; - private final ClientSocketChannelFactory channelFactory; - private final HashedWheelTimer requestTimer; - private final StatsLogger statsLogger; - - // bookkeeper client state - private boolean closed = false; - private BookKeeper bkc = null; - private ZooKeeperClient zkc; - private final boolean ownZK; - // feature provider - private final Optional<FeatureProvider> featureProvider; - - @SuppressWarnings("deprecation") - private synchronized void commonInitialization( - DistributedLogConfiguration conf, String ledgersPath, - ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer) - throws IOException, InterruptedException, KeeperException { - ClientConfiguration bkConfig = new ClientConfiguration(); - bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout()); - bkConfig.setReadTimeout(conf.getBKClientReadTimeout()); - bkConfig.setZkLedgersRootPath(ledgersPath); - bkConfig.setZkTimeout(conf.getBKClientZKSessionTimeoutMilliSeconds()); - bkConfig.setNumWorkerThreads(conf.getBKClientNumberWorkerThreads()); - bkConfig.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class); - bkConfig.setZkRequestRateLimit(conf.getBKClientZKRequestRateLimit()); - bkConfig.setProperty(RegionAwareEnsemblePlacementPolicy.REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME, - DistributedLogConstants.DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME); - // reload configuration from dl configuration with settings prefixed with 'bkc.' - ConfUtils.loadConfiguration(bkConfig, conf, "bkc."); - - Class<? extends DNSToSwitchMapping> dnsResolverCls; - try { - dnsResolverCls = conf.getEnsemblePlacementDnsResolverClass(); - } catch (ConfigurationException e) { - LOG.error("Failed to load bk dns resolver : ", e); - throw new IOException("Failed to load bk dns resolver : ", e); - } - final DNSToSwitchMapping dnsResolver = - NetUtils.getDNSResolver(dnsResolverCls, conf.getBkDNSResolverOverrides()); - - this.bkc = BookKeeper.newBuilder() - .config(bkConfig) - .zk(zkc.get()) - .channelFactory(channelFactory) - .statsLogger(statsLogger) - .dnsResolver(dnsResolver) - .requestTimer(requestTimer) - .featureProvider(featureProvider.orNull()) - .build(); - } - - BookKeeperClient(DistributedLogConfiguration conf, - String name, - String zkServers, - ZooKeeperClient zkc, - String ledgersPath, - ClientSocketChannelFactory channelFactory, - HashedWheelTimer requestTimer, - StatsLogger statsLogger, - Optional<FeatureProvider> featureProvider) { - this.conf = conf; - this.name = name; - this.zkServers = zkServers; - this.ledgersPath = ledgersPath; - this.passwd = conf.getBKDigestPW().getBytes(UTF_8); - this.channelFactory = channelFactory; - this.requestTimer = requestTimer; - this.statsLogger = statsLogger; - this.featureProvider = featureProvider; - this.ownZK = null == zkc; - if (null != zkc) { - // reference the passing zookeeper client - this.zkc = zkc; - } - } - - private synchronized void initialize() throws IOException { - if (null != this.bkc) { - return; - } - if (null == this.zkc) { - int zkSessionTimeout = conf.getBKClientZKSessionTimeoutMilliSeconds(); - RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( - conf.getBKClientZKRetryBackoffStartMillis(), - conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries()); - Credentials credentials = Credentials.NONE; - if (conf.getZkAclId() != null) { - credentials = new DigestCredentials(conf.getZkAclId(), conf.getZkAclId()); - } - - this.zkc = new ZooKeeperClient(name + ":zk", zkSessionTimeout, 2 * zkSessionTimeout, zkServers, - retryPolicy, statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(), - conf.getBKClientZKRequestRateLimit(), credentials); - } - - try { - commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer); - } catch (InterruptedException e) { - throw new DLInterruptedException("Interrupted on creating bookkeeper client " + name + " : ", e); - } catch (KeeperException e) { - throw new ZKException("Error on creating bookkeeper client " + name + " : ", e); - } - - if (ownZK) { - LOG.info("BookKeeper Client created {} with its own ZK Client : ledgersPath = {}, numRetries = {}, " + - "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}", - new Object[] { name, ledgersPath, - conf.getBKClientZKNumRetries(), conf.getBKClientZKSessionTimeoutMilliSeconds(), - conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(), - conf.getBkDNSResolverOverrides() }); - } else { - LOG.info("BookKeeper Client created {} with shared zookeeper client : ledgersPath = {}, numRetries = {}, " + - "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}", - new Object[] { name, ledgersPath, - conf.getZKNumRetries(), conf.getZKSessionTimeoutMilliseconds(), - conf.getZKRetryBackoffStartMillis(), conf.getZKRetryBackoffMaxMillis(), - conf.getBkDNSResolverOverrides() }); - } - } - - - public synchronized BookKeeper get() throws IOException { - checkClosedOrInError(); - if (null == bkc) { - initialize(); - } - return bkc; - } - - // Util functions - public Future<LedgerHandle> createLedger(int ensembleSize, - int writeQuorumSize, - int ackQuorumSize) { - BookKeeper bk; - try { - bk = get(); - } catch (IOException ioe) { - return Future.exception(ioe); - } - final Promise<LedgerHandle> promise = new Promise<LedgerHandle>(); - bk.asyncCreateLedger(ensembleSize, writeQuorumSize, ackQuorumSize, - BookKeeper.DigestType.CRC32, passwd, new AsyncCallback.CreateCallback() { - @Override - public void createComplete(int rc, LedgerHandle lh, Object ctx) { - if (BKException.Code.OK == rc) { - promise.updateIfEmpty(new Return<LedgerHandle>(lh)); - } else { - promise.updateIfEmpty(new Throw<LedgerHandle>(BKException.create(rc))); - } - } - }, null); - return promise; - } - - public Future<Void> deleteLedger(long lid, - final boolean ignoreNonExistentLedger) { - BookKeeper bk; - try { - bk = get(); - } catch (IOException ioe) { - return Future.exception(ioe); - } - final Promise<Void> promise = new Promise<Void>(); - bk.asyncDeleteLedger(lid, new AsyncCallback.DeleteCallback() { - @Override - public void deleteComplete(int rc, Object ctx) { - if (BKException.Code.OK == rc) { - promise.updateIfEmpty(new Return<Void>(null)); - } else if (BKException.Code.NoSuchLedgerExistsException == rc) { - if (ignoreNonExistentLedger) { - promise.updateIfEmpty(new Return<Void>(null)); - } else { - promise.updateIfEmpty(new Throw<Void>(BKException.create(rc))); - } - } else { - promise.updateIfEmpty(new Throw<Void>(BKException.create(rc))); - } - } - }, null); - return promise; - } - - public void close() { - BookKeeper bkcToClose; - ZooKeeperClient zkcToClose; - synchronized (this) { - if (closed) { - return; - } - closed = true; - bkcToClose = bkc; - zkcToClose = zkc; - } - - LOG.info("BookKeeper Client closed {}", name); - if (null != bkcToClose) { - try { - bkcToClose.close(); - } catch (InterruptedException e) { - LOG.warn("Interrupted on closing bookkeeper client {} : ", name, e); - Thread.currentThread().interrupt(); - } catch (BKException e) { - LOG.warn("Error on closing bookkeeper client {} : ", name, e); - } - } - if (null != zkcToClose) { - if (ownZK) { - zkcToClose.close(); - } - } - } - - public synchronized void checkClosedOrInError() throws AlreadyClosedException { - if (closed) { - LOG.error("BookKeeper Client {} is already closed", name); - throw new AlreadyClosedException("BookKeeper Client " + name + " is already closed"); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClientBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClientBuilder.java deleted file mode 100644 index cad1096..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClientBuilder.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; -import org.apache.bookkeeper.feature.FeatureProvider; - -import org.apache.bookkeeper.feature.Feature; - -/** - * Builder to build bookkeeper client. - */ -public class BookKeeperClientBuilder { - - /** - * Create a bookkeeper client builder to build bookkeeper clients. - * - * @return bookkeeper client builder. - */ - public static BookKeeperClientBuilder newBuilder() { - return new BookKeeperClientBuilder(); - } - - // client name - private String name = null; - // dl config - private DistributedLogConfiguration dlConfig = null; - // bookkeeper settings - // zookeeper client - private ZooKeeperClient zkc = null; - // or zookeeper servers - private String zkServers = null; - // ledgers path - private String ledgersPath = null; - // statsLogger - private StatsLogger statsLogger = NullStatsLogger.INSTANCE; - // client channel factory - private ClientSocketChannelFactory channelFactory = null; - // request timer - private HashedWheelTimer requestTimer = null; - // feature provider - private Optional<FeatureProvider> featureProvider = Optional.absent(); - - // Cached BookKeeper Client - private BookKeeperClient cachedClient = null; - - /** - * Private bookkeeper builder. - */ - private BookKeeperClientBuilder() {} - - /** - * Set client name. - * - * @param name - * client name. - * @return builder - */ - public synchronized BookKeeperClientBuilder name(String name) { - this.name = name; - return this; - } - - /** - * <i>dlConfig</i> used to configure bookkeeper client. - * - * @param dlConfig - * distributedlog config. - * @return builder. - */ - public synchronized BookKeeperClientBuilder dlConfig(DistributedLogConfiguration dlConfig) { - this.dlConfig = dlConfig; - return this; - } - - /** - * Set the zkc used to build bookkeeper client. If a zookeeper client is provided in this - * method, bookkeeper client will use it rather than creating a brand new one. - * - * @param zkc - * zookeeper client. - * @return builder - * @see #zkServers(String) - */ - public synchronized BookKeeperClientBuilder zkc(ZooKeeperClient zkc) { - this.zkc = zkc; - return this; - } - - /** - * Set the zookeeper servers that bookkeeper client would connect to. If no zookeeper client - * is provided by {@link #zkc(ZooKeeperClient)}, bookkeeper client will use the given string - * to create a brand new zookeeper client. - * - * @param zkServers - * zookeeper servers that bookkeeper client would connect to. - * @return builder - * @see #zkc(ZooKeeperClient) - */ - public synchronized BookKeeperClientBuilder zkServers(String zkServers) { - this.zkServers = zkServers; - return this; - } - - /** - * Set the ledgers path that bookkeeper client is going to access. - * - * @param ledgersPath - * ledgers path - * @return builder - * @see org.apache.bookkeeper.conf.ClientConfiguration#getZkLedgersRootPath() - */ - public synchronized BookKeeperClientBuilder ledgersPath(String ledgersPath) { - this.ledgersPath = ledgersPath; - return this; - } - - /** - * Build BookKeeper client using existing <i>bkc</i> client. - * - * @param bkc - * bookkeeper client. - * @return builder - */ - public synchronized BookKeeperClientBuilder bkc(BookKeeperClient bkc) { - this.cachedClient = bkc; - return this; - } - - /** - * Build BookKeeper client using existing <i>channelFactory</i>. - * - * @param channelFactory - * Channel Factory used to build bookkeeper client. - * @return bookkeeper client builder. - */ - public synchronized BookKeeperClientBuilder channelFactory(ClientSocketChannelFactory channelFactory) { - this.channelFactory = channelFactory; - return this; - } - - /** - * Build BookKeeper client using existing <i>request timer</i>. - * - * @param requestTimer - * HashedWheelTimer used to build bookkeeper client. - * @return bookkeeper client builder. - */ - public synchronized BookKeeperClientBuilder requestTimer(HashedWheelTimer requestTimer) { - this.requestTimer = requestTimer; - return this; - } - - /** - * Build BookKeeper Client using given stats logger <i>statsLogger</i>. - * - * @param statsLogger - * stats logger to report stats - * @return builder. - */ - public synchronized BookKeeperClientBuilder statsLogger(StatsLogger statsLogger) { - this.statsLogger = statsLogger; - return this; - } - - public synchronized BookKeeperClientBuilder featureProvider(Optional<FeatureProvider> featureProvider) { - this.featureProvider = featureProvider; - return this; - } - - private void validateParameters() { - Preconditions.checkNotNull(name, "Missing client name."); - Preconditions.checkNotNull(dlConfig, "Missing DistributedLog Configuration."); - Preconditions.checkArgument(null == zkc || null == zkServers, "Missing zookeeper setting."); - Preconditions.checkNotNull(ledgersPath, "Missing Ledgers Root Path."); - } - - public synchronized BookKeeperClient build() { - if (null == cachedClient) { - cachedClient = buildClient(); - } - return cachedClient; - } - - private BookKeeperClient buildClient() { - validateParameters(); - return new BookKeeperClient(dlConfig, name, zkServers, zkc, ledgersPath, channelFactory, requestTimer, statsLogger, featureProvider); - } -}