http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java new file mode 100644 index 0000000..a7b17f4 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Optional; +import org.apache.distributedlog.ZooKeeperClient.Credentials; +import org.apache.distributedlog.ZooKeeperClient.DigestCredentials; +import org.apache.distributedlog.exceptions.AlreadyClosedException; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.exceptions.ZKException; +import org.apache.distributedlog.net.NetUtils; +import org.apache.distributedlog.util.ConfUtils; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import com.twitter.util.Return; +import com.twitter.util.Throw; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.net.DNSToSwitchMapping; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; +import org.apache.bookkeeper.zookeeper.RetryPolicy; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.zookeeper.KeeperException; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.util.HashedWheelTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static com.google.common.base.Charsets.UTF_8; + +/** + * BookKeeper Client wrapper over {@link BookKeeper}. + * + * <h3>Metrics</h3> + * <ul> + * <li> bookkeeper operation stats are exposed under current scope by {@link BookKeeper} + * </ul> + */ +public class BookKeeperClient { + static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class); + + // Parameters to build bookkeeper client + private final DistributedLogConfiguration conf; + private final String name; + private final String zkServers; + private final String ledgersPath; + private final byte[] passwd; + private final ClientSocketChannelFactory channelFactory; + private final HashedWheelTimer requestTimer; + private final StatsLogger statsLogger; + + // bookkeeper client state + private boolean closed = false; + private BookKeeper bkc = null; + private ZooKeeperClient zkc; + private final boolean ownZK; + // feature provider + private final Optional<FeatureProvider> featureProvider; + + @SuppressWarnings("deprecation") + private synchronized void commonInitialization( + DistributedLogConfiguration conf, String ledgersPath, + ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer) + throws IOException, InterruptedException, KeeperException { + ClientConfiguration bkConfig = new ClientConfiguration(); + bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout()); + bkConfig.setReadTimeout(conf.getBKClientReadTimeout()); + bkConfig.setZkLedgersRootPath(ledgersPath); + bkConfig.setZkTimeout(conf.getBKClientZKSessionTimeoutMilliSeconds()); + bkConfig.setNumWorkerThreads(conf.getBKClientNumberWorkerThreads()); + bkConfig.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class); + bkConfig.setZkRequestRateLimit(conf.getBKClientZKRequestRateLimit()); + bkConfig.setProperty(RegionAwareEnsemblePlacementPolicy.REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME, + DistributedLogConstants.DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME); + // reload configuration from dl configuration with settings prefixed with 'bkc.' + ConfUtils.loadConfiguration(bkConfig, conf, "bkc."); + + Class<? extends DNSToSwitchMapping> dnsResolverCls; + try { + dnsResolverCls = conf.getEnsemblePlacementDnsResolverClass(); + } catch (ConfigurationException e) { + LOG.error("Failed to load bk dns resolver : ", e); + throw new IOException("Failed to load bk dns resolver : ", e); + } + final DNSToSwitchMapping dnsResolver = + NetUtils.getDNSResolver(dnsResolverCls, conf.getBkDNSResolverOverrides()); + + this.bkc = BookKeeper.newBuilder() + .config(bkConfig) + .zk(zkc.get()) + .channelFactory(channelFactory) + .statsLogger(statsLogger) + .dnsResolver(dnsResolver) + .requestTimer(requestTimer) + .featureProvider(featureProvider.orNull()) + .build(); + } + + BookKeeperClient(DistributedLogConfiguration conf, + String name, + String zkServers, + ZooKeeperClient zkc, + String ledgersPath, + ClientSocketChannelFactory channelFactory, + HashedWheelTimer requestTimer, + StatsLogger statsLogger, + Optional<FeatureProvider> featureProvider) { + this.conf = conf; + this.name = name; + this.zkServers = zkServers; + this.ledgersPath = ledgersPath; + this.passwd = conf.getBKDigestPW().getBytes(UTF_8); + this.channelFactory = channelFactory; + this.requestTimer = requestTimer; + this.statsLogger = statsLogger; + this.featureProvider = featureProvider; + this.ownZK = null == zkc; + if (null != zkc) { + // reference the passing zookeeper client + this.zkc = zkc; + } + } + + private synchronized void initialize() throws IOException { + if (null != this.bkc) { + return; + } + if (null == this.zkc) { + int zkSessionTimeout = conf.getBKClientZKSessionTimeoutMilliSeconds(); + RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( + conf.getBKClientZKRetryBackoffStartMillis(), + conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries()); + Credentials credentials = Credentials.NONE; + if (conf.getZkAclId() != null) { + credentials = new DigestCredentials(conf.getZkAclId(), conf.getZkAclId()); + } + + this.zkc = new ZooKeeperClient(name + ":zk", zkSessionTimeout, 2 * zkSessionTimeout, zkServers, + retryPolicy, statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(), + conf.getBKClientZKRequestRateLimit(), credentials); + } + + try { + commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer); + } catch (InterruptedException e) { + throw new DLInterruptedException("Interrupted on creating bookkeeper client " + name + " : ", e); + } catch (KeeperException e) { + throw new ZKException("Error on creating bookkeeper client " + name + " : ", e); + } + + if (ownZK) { + LOG.info("BookKeeper Client created {} with its own ZK Client : ledgersPath = {}, numRetries = {}, " + + "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}", + new Object[] { name, ledgersPath, + conf.getBKClientZKNumRetries(), conf.getBKClientZKSessionTimeoutMilliSeconds(), + conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(), + conf.getBkDNSResolverOverrides() }); + } else { + LOG.info("BookKeeper Client created {} with shared zookeeper client : ledgersPath = {}, numRetries = {}, " + + "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}", + new Object[] { name, ledgersPath, + conf.getZKNumRetries(), conf.getZKSessionTimeoutMilliseconds(), + conf.getZKRetryBackoffStartMillis(), conf.getZKRetryBackoffMaxMillis(), + conf.getBkDNSResolverOverrides() }); + } + } + + + public synchronized BookKeeper get() throws IOException { + checkClosedOrInError(); + if (null == bkc) { + initialize(); + } + return bkc; + } + + // Util functions + public Future<LedgerHandle> createLedger(int ensembleSize, + int writeQuorumSize, + int ackQuorumSize) { + BookKeeper bk; + try { + bk = get(); + } catch (IOException ioe) { + return Future.exception(ioe); + } + final Promise<LedgerHandle> promise = new Promise<LedgerHandle>(); + bk.asyncCreateLedger(ensembleSize, writeQuorumSize, ackQuorumSize, + BookKeeper.DigestType.CRC32, passwd, new AsyncCallback.CreateCallback() { + @Override + public void createComplete(int rc, LedgerHandle lh, Object ctx) { + if (BKException.Code.OK == rc) { + promise.updateIfEmpty(new Return<LedgerHandle>(lh)); + } else { + promise.updateIfEmpty(new Throw<LedgerHandle>(BKException.create(rc))); + } + } + }, null); + return promise; + } + + public Future<Void> deleteLedger(long lid, + final boolean ignoreNonExistentLedger) { + BookKeeper bk; + try { + bk = get(); + } catch (IOException ioe) { + return Future.exception(ioe); + } + final Promise<Void> promise = new Promise<Void>(); + bk.asyncDeleteLedger(lid, new AsyncCallback.DeleteCallback() { + @Override + public void deleteComplete(int rc, Object ctx) { + if (BKException.Code.OK == rc) { + promise.updateIfEmpty(new Return<Void>(null)); + } else if (BKException.Code.NoSuchLedgerExistsException == rc) { + if (ignoreNonExistentLedger) { + promise.updateIfEmpty(new Return<Void>(null)); + } else { + promise.updateIfEmpty(new Throw<Void>(BKException.create(rc))); + } + } else { + promise.updateIfEmpty(new Throw<Void>(BKException.create(rc))); + } + } + }, null); + return promise; + } + + public void close() { + BookKeeper bkcToClose; + ZooKeeperClient zkcToClose; + synchronized (this) { + if (closed) { + return; + } + closed = true; + bkcToClose = bkc; + zkcToClose = zkc; + } + + LOG.info("BookKeeper Client closed {}", name); + if (null != bkcToClose) { + try { + bkcToClose.close(); + } catch (InterruptedException e) { + LOG.warn("Interrupted on closing bookkeeper client {} : ", name, e); + Thread.currentThread().interrupt(); + } catch (BKException e) { + LOG.warn("Error on closing bookkeeper client {} : ", name, e); + } + } + if (null != zkcToClose) { + if (ownZK) { + zkcToClose.close(); + } + } + } + + public synchronized void checkClosedOrInError() throws AlreadyClosedException { + if (closed) { + LOG.error("BookKeeper Client {} is already closed", name); + throw new AlreadyClosedException("BookKeeper Client " + name + " is already closed"); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java new file mode 100644 index 0000000..a356f9f --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.util.HashedWheelTimer; +import org.apache.bookkeeper.feature.FeatureProvider; + +import org.apache.bookkeeper.feature.Feature; + +/** + * Builder to build bookkeeper client. + */ +public class BookKeeperClientBuilder { + + /** + * Create a bookkeeper client builder to build bookkeeper clients. + * + * @return bookkeeper client builder. + */ + public static BookKeeperClientBuilder newBuilder() { + return new BookKeeperClientBuilder(); + } + + // client name + private String name = null; + // dl config + private DistributedLogConfiguration dlConfig = null; + // bookkeeper settings + // zookeeper client + private ZooKeeperClient zkc = null; + // or zookeeper servers + private String zkServers = null; + // ledgers path + private String ledgersPath = null; + // statsLogger + private StatsLogger statsLogger = NullStatsLogger.INSTANCE; + // client channel factory + private ClientSocketChannelFactory channelFactory = null; + // request timer + private HashedWheelTimer requestTimer = null; + // feature provider + private Optional<FeatureProvider> featureProvider = Optional.absent(); + + // Cached BookKeeper Client + private BookKeeperClient cachedClient = null; + + /** + * Private bookkeeper builder. + */ + private BookKeeperClientBuilder() {} + + /** + * Set client name. + * + * @param name + * client name. + * @return builder + */ + public synchronized BookKeeperClientBuilder name(String name) { + this.name = name; + return this; + } + + /** + * <i>dlConfig</i> used to configure bookkeeper client. + * + * @param dlConfig + * distributedlog config. + * @return builder. + */ + public synchronized BookKeeperClientBuilder dlConfig(DistributedLogConfiguration dlConfig) { + this.dlConfig = dlConfig; + return this; + } + + /** + * Set the zkc used to build bookkeeper client. If a zookeeper client is provided in this + * method, bookkeeper client will use it rather than creating a brand new one. + * + * @param zkc + * zookeeper client. + * @return builder + * @see #zkServers(String) + */ + public synchronized BookKeeperClientBuilder zkc(ZooKeeperClient zkc) { + this.zkc = zkc; + return this; + } + + /** + * Set the zookeeper servers that bookkeeper client would connect to. If no zookeeper client + * is provided by {@link #zkc(ZooKeeperClient)}, bookkeeper client will use the given string + * to create a brand new zookeeper client. + * + * @param zkServers + * zookeeper servers that bookkeeper client would connect to. + * @return builder + * @see #zkc(ZooKeeperClient) + */ + public synchronized BookKeeperClientBuilder zkServers(String zkServers) { + this.zkServers = zkServers; + return this; + } + + /** + * Set the ledgers path that bookkeeper client is going to access. + * + * @param ledgersPath + * ledgers path + * @return builder + * @see org.apache.bookkeeper.conf.ClientConfiguration#getZkLedgersRootPath() + */ + public synchronized BookKeeperClientBuilder ledgersPath(String ledgersPath) { + this.ledgersPath = ledgersPath; + return this; + } + + /** + * Build BookKeeper client using existing <i>bkc</i> client. + * + * @param bkc + * bookkeeper client. + * @return builder + */ + public synchronized BookKeeperClientBuilder bkc(BookKeeperClient bkc) { + this.cachedClient = bkc; + return this; + } + + /** + * Build BookKeeper client using existing <i>channelFactory</i>. + * + * @param channelFactory + * Channel Factory used to build bookkeeper client. + * @return bookkeeper client builder. + */ + public synchronized BookKeeperClientBuilder channelFactory(ClientSocketChannelFactory channelFactory) { + this.channelFactory = channelFactory; + return this; + } + + /** + * Build BookKeeper client using existing <i>request timer</i>. + * + * @param requestTimer + * HashedWheelTimer used to build bookkeeper client. + * @return bookkeeper client builder. + */ + public synchronized BookKeeperClientBuilder requestTimer(HashedWheelTimer requestTimer) { + this.requestTimer = requestTimer; + return this; + } + + /** + * Build BookKeeper Client using given stats logger <i>statsLogger</i>. + * + * @param statsLogger + * stats logger to report stats + * @return builder. + */ + public synchronized BookKeeperClientBuilder statsLogger(StatsLogger statsLogger) { + this.statsLogger = statsLogger; + return this; + } + + public synchronized BookKeeperClientBuilder featureProvider(Optional<FeatureProvider> featureProvider) { + this.featureProvider = featureProvider; + return this; + } + + private void validateParameters() { + Preconditions.checkNotNull(name, "Missing client name."); + Preconditions.checkNotNull(dlConfig, "Missing DistributedLog Configuration."); + Preconditions.checkArgument(null == zkc || null == zkServers, "Missing zookeeper setting."); + Preconditions.checkNotNull(ledgersPath, "Missing Ledgers Root Path."); + } + + public synchronized BookKeeperClient build() { + if (null == cachedClient) { + cachedClient = buildClient(); + } + return cachedClient; + } + + private BookKeeperClient buildClient() { + validateParameters(); + return new BookKeeperClient(dlConfig, name, zkServers, zkc, ledgersPath, channelFactory, requestTimer, statsLogger, featureProvider); + } +}