http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java deleted file mode 100644 index 1077cd0..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java +++ /dev/null @@ -1,1200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.LogRecordSetBuffer; -import com.twitter.distributedlog.client.monitor.MonitorServiceClient; -import com.twitter.distributedlog.client.ownership.OwnershipCache; -import com.twitter.distributedlog.client.proxy.ClusterClient; -import com.twitter.distributedlog.client.proxy.HostProvider; -import com.twitter.distributedlog.client.proxy.ProxyClient; -import com.twitter.distributedlog.client.proxy.ProxyClientManager; -import com.twitter.distributedlog.client.proxy.ProxyListener; -import com.twitter.distributedlog.client.resolver.RegionResolver; -import com.twitter.distributedlog.client.routing.RoutingService; -import com.twitter.distributedlog.client.routing.RoutingService.RoutingContext; -import com.twitter.distributedlog.client.stats.ClientStats; -import com.twitter.distributedlog.client.stats.OpStats; -import com.twitter.distributedlog.exceptions.DLClientClosedException; -import com.twitter.distributedlog.exceptions.DLException; -import com.twitter.distributedlog.exceptions.ServiceUnavailableException; -import com.twitter.distributedlog.exceptions.StreamUnavailableException; -import com.twitter.distributedlog.service.DLSocketAddress; -import com.twitter.distributedlog.service.DistributedLogClient; -import com.twitter.distributedlog.thrift.service.BulkWriteResponse; -import com.twitter.distributedlog.thrift.service.HeartbeatOptions; -import com.twitter.distributedlog.thrift.service.ResponseHeader; -import com.twitter.distributedlog.thrift.service.ServerInfo; -import com.twitter.distributedlog.thrift.service.ServerStatus; -import com.twitter.distributedlog.thrift.service.StatusCode; -import com.twitter.distributedlog.thrift.service.WriteContext; -import com.twitter.distributedlog.thrift.service.WriteResponse; -import com.twitter.distributedlog.util.ProtocolUtils; -import com.twitter.finagle.CancelledRequestException; -import com.twitter.finagle.ConnectionFailedException; -import com.twitter.finagle.Failure; -import com.twitter.finagle.NoBrokersAvailableException; -import com.twitter.finagle.RequestTimeoutException; -import com.twitter.finagle.ServiceException; -import com.twitter.finagle.ServiceTimeoutException; -import com.twitter.finagle.WriteException; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.stats.StatsReceiver; -import com.twitter.finagle.thrift.ClientId; -import com.twitter.util.Duration; -import com.twitter.util.Function; -import com.twitter.util.Function0; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.thrift.TApplicationException; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.Seq; -import scala.runtime.AbstractFunction1; - - -/** - * Implementation of distributedlog client. - */ -public class DistributedLogClientImpl implements DistributedLogClient, MonitorServiceClient, - RoutingService.RoutingListener, ProxyListener, HostProvider { - - private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class); - - private final String clientName; - private final ClientId clientId; - private final ClientConfig clientConfig; - private final RoutingService routingService; - private final ProxyClient.Builder clientBuilder; - private final boolean streamFailfast; - private final Pattern streamNameRegexPattern; - - // Timer - private final HashedWheelTimer dlTimer; - - // region resolver - private final RegionResolver regionResolver; - - // Ownership maintenance - private final OwnershipCache ownershipCache; - // Channel/Client management - private final ProxyClientManager clientManager; - // Cluster Client (for routing service) - private final Optional<ClusterClient> clusterClient; - - // Close Status - private boolean closed = false; - private final ReentrantReadWriteLock closeLock = - new ReentrantReadWriteLock(); - - abstract class StreamOp implements TimerTask { - final String stream; - - final AtomicInteger tries = new AtomicInteger(0); - final RoutingContext routingContext = RoutingContext.of(regionResolver); - final WriteContext ctx = new WriteContext(); - final Stopwatch stopwatch; - final OpStats opStats; - SocketAddress nextAddressToSend; - - StreamOp(final String stream, final OpStats opStats) { - this.stream = stream; - this.stopwatch = Stopwatch.createStarted(); - this.opStats = opStats; - } - - boolean shouldTimeout() { - long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); - return shouldTimeout(elapsedMs); - } - - boolean shouldTimeout(long elapsedMs) { - return clientConfig.getRequestTimeoutMs() > 0 - && elapsedMs >= clientConfig.getRequestTimeoutMs(); - } - - void send(SocketAddress address) { - long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); - if (clientConfig.getMaxRedirects() > 0 - && tries.get() >= clientConfig.getMaxRedirects()) { - fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs), - "Exhausted max redirects in " + elapsedMs + " ms")); - return; - } else if (shouldTimeout(elapsedMs)) { - fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs), - "Exhausted max request timeout " + clientConfig.getRequestTimeoutMs() - + " in " + elapsedMs + " ms")); - return; - } - synchronized (this) { - String addrStr = address.toString(); - if (ctx.isSetTriedHosts() && ctx.getTriedHosts().contains(addrStr)) { - nextAddressToSend = address; - dlTimer.newTimeout(this, - Math.min(clientConfig.getRedirectBackoffMaxMs(), - tries.get() * clientConfig.getRedirectBackoffStartMs()), - TimeUnit.MILLISECONDS); - } else { - doSend(address); - } - } - } - - abstract Future<ResponseHeader> sendRequest(ProxyClient sc); - - void doSend(SocketAddress address) { - ctx.addToTriedHosts(address.toString()); - if (clientConfig.isChecksumEnabled()) { - Long crc32 = computeChecksum(); - if (null != crc32) { - ctx.setCrc32(crc32); - } - } - tries.incrementAndGet(); - sendWriteRequest(address, this); - } - - void beforeComplete(ProxyClient sc, ResponseHeader responseHeader) { - ownershipCache.updateOwner(stream, sc.getAddress()); - } - - void complete(SocketAddress address) { - stopwatch.stop(); - opStats.completeRequest(address, - stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get()); - } - - void fail(SocketAddress address, Throwable t) { - stopwatch.stop(); - opStats.failRequest(address, - stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get()); - } - - Long computeChecksum() { - return null; - } - - @Override - public synchronized void run(Timeout timeout) throws Exception { - if (!timeout.isCancelled() && null != nextAddressToSend) { - doSend(nextAddressToSend); - } else { - fail(null, new CancelledRequestException()); - } - } - } - - class BulkWriteOp extends StreamOp { - - final List<ByteBuffer> data; - final ArrayList<Promise<DLSN>> results; - - BulkWriteOp(final String name, final List<ByteBuffer> data) { - super(name, clientStats.getOpStats("bulk_write")); - this.data = data; - - // This could take a while (relatively speaking) for very large inputs. We probably don't want - // to go so large for other reasons though. - this.results = new ArrayList<Promise<DLSN>>(data.size()); - for (int i = 0; i < data.size(); i++) { - checkNotNull(data.get(i)); - this.results.add(new Promise<DLSN>()); - } - } - - @Override - Future<ResponseHeader> sendRequest(final ProxyClient sc) { - return sc.getService().writeBulkWithContext(stream, data, ctx) - .addEventListener(new FutureEventListener<BulkWriteResponse>() { - @Override - public void onSuccess(BulkWriteResponse response) { - // For non-success case, the ResponseHeader handler (the caller) will handle it. - // Note success in this case means no finagle errors have occurred - // (such as finagle connection issues). In general code != SUCCESS means there's some error - // reported by dlog service. The caller will handle such errors. - if (response.getHeader().getCode() == StatusCode.SUCCESS) { - beforeComplete(sc, response.getHeader()); - BulkWriteOp.this.complete(sc.getAddress(), response); - if (response.getWriteResponses().size() == 0 && data.size() > 0) { - logger.error("non-empty bulk write got back empty response without failure for stream {}", - stream); - } - } - } - @Override - public void onFailure(Throwable cause) { - // Handled by the ResponseHeader listener (attached by the caller). - } - }).map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() { - @Override - public ResponseHeader apply(BulkWriteResponse response) { - // We need to return the ResponseHeader to the caller's listener to process DLOG errors. - return response.getHeader(); - } - }); - } - - void complete(SocketAddress address, BulkWriteResponse bulkWriteResponse) { - super.complete(address); - Iterator<WriteResponse> writeResponseIterator = bulkWriteResponse.getWriteResponses().iterator(); - Iterator<Promise<DLSN>> resultIterator = results.iterator(); - - // Fill in errors from thrift responses. - while (resultIterator.hasNext() && writeResponseIterator.hasNext()) { - Promise<DLSN> result = resultIterator.next(); - WriteResponse writeResponse = writeResponseIterator.next(); - if (StatusCode.SUCCESS == writeResponse.getHeader().getCode()) { - result.setValue(DLSN.deserialize(writeResponse.getDlsn())); - } else { - result.setException(DLException.of(writeResponse.getHeader())); - } - } - - // Should never happen, but just in case so there's some record. - if (bulkWriteResponse.getWriteResponses().size() != data.size()) { - logger.error("wrong number of results, response = {} records = {}", - bulkWriteResponse.getWriteResponses().size(), data.size()); - } - } - - @Override - void fail(SocketAddress address, Throwable t) { - - // StreamOp.fail is called to fail the overall request. In case of BulkWriteOp we take the request level - // exception to apply to the first write. In fact for request level exceptions no request has ever been - // attempted, but logically we associate the error with the first write. - super.fail(address, t); - Iterator<Promise<DLSN>> resultIterator = results.iterator(); - - // Fail the first write with the batch level failure. - if (resultIterator.hasNext()) { - Promise<DLSN> result = resultIterator.next(); - result.setException(t); - } - - // Fail the remaining writes as cancelled requests. - while (resultIterator.hasNext()) { - Promise<DLSN> result = resultIterator.next(); - result.setException(new CancelledRequestException()); - } - } - - @SuppressWarnings("unchecked") - List<Future<DLSN>> result() { - return (List) results; - } - } - - abstract class AbstractWriteOp extends StreamOp { - - final Promise<WriteResponse> result = new Promise<WriteResponse>(); - Long crc32 = null; - - AbstractWriteOp(final String name, final OpStats opStats) { - super(name, opStats); - } - - void complete(SocketAddress address, WriteResponse response) { - super.complete(address); - result.setValue(response); - } - - @Override - void fail(SocketAddress address, Throwable t) { - super.fail(address, t); - result.setException(t); - } - - @Override - Long computeChecksum() { - if (null == crc32) { - crc32 = ProtocolUtils.streamOpCRC32(stream); - } - return crc32; - } - - @Override - Future<ResponseHeader> sendRequest(final ProxyClient sc) { - return this.sendWriteRequest(sc).addEventListener(new FutureEventListener<WriteResponse>() { - @Override - public void onSuccess(WriteResponse response) { - if (response.getHeader().getCode() == StatusCode.SUCCESS) { - beforeComplete(sc, response.getHeader()); - AbstractWriteOp.this.complete(sc.getAddress(), response); - } - } - @Override - public void onFailure(Throwable cause) { - // handled by the ResponseHeader listener - } - }).map(new AbstractFunction1<WriteResponse, ResponseHeader>() { - @Override - public ResponseHeader apply(WriteResponse response) { - return response.getHeader(); - } - }); - } - - abstract Future<WriteResponse> sendWriteRequest(ProxyClient sc); - } - - class WriteOp extends AbstractWriteOp { - final ByteBuffer data; - - WriteOp(final String name, final ByteBuffer data) { - super(name, clientStats.getOpStats("write")); - this.data = data; - } - - @Override - Future<WriteResponse> sendWriteRequest(ProxyClient sc) { - return sc.getService().writeWithContext(stream, data, ctx); - } - - @Override - Long computeChecksum() { - if (null == crc32) { - byte[] dataBytes = new byte[data.remaining()]; - data.duplicate().get(dataBytes); - crc32 = ProtocolUtils.writeOpCRC32(stream, dataBytes); - } - return crc32; - } - - Future<DLSN> result() { - return result.map(new AbstractFunction1<WriteResponse, DLSN>() { - @Override - public DLSN apply(WriteResponse response) { - return DLSN.deserialize(response.getDlsn()); - } - }); - } - } - - class TruncateOp extends AbstractWriteOp { - final DLSN dlsn; - - TruncateOp(String name, DLSN dlsn) { - super(name, clientStats.getOpStats("truncate")); - this.dlsn = dlsn; - } - - @Override - Long computeChecksum() { - if (null == crc32) { - crc32 = ProtocolUtils.truncateOpCRC32(stream, dlsn); - } - return crc32; - } - - @Override - Future<WriteResponse> sendWriteRequest(ProxyClient sc) { - return sc.getService().truncate(stream, dlsn.serialize(), ctx); - } - - Future<Boolean> result() { - return result.map(new AbstractFunction1<WriteResponse, Boolean>() { - @Override - public Boolean apply(WriteResponse response) { - return true; - } - }); - } - } - - class WriteRecordSetOp extends WriteOp { - - WriteRecordSetOp(String name, LogRecordSetBuffer recordSet) { - super(name, recordSet.getBuffer()); - ctx.setIsRecordSet(true); - } - - } - - - class ReleaseOp extends AbstractWriteOp { - - ReleaseOp(String name) { - super(name, clientStats.getOpStats("release")); - } - - @Override - Future<WriteResponse> sendWriteRequest(ProxyClient sc) { - return sc.getService().release(stream, ctx); - } - - @Override - void beforeComplete(ProxyClient sc, ResponseHeader header) { - ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted"); - } - - Future<Void> result() { - return result.map(new AbstractFunction1<WriteResponse, Void>() { - @Override - public Void apply(WriteResponse response) { - return null; - } - }); - } - } - - class DeleteOp extends AbstractWriteOp { - - DeleteOp(String name) { - super(name, clientStats.getOpStats("delete")); - } - - @Override - Future<WriteResponse> sendWriteRequest(ProxyClient sc) { - return sc.getService().delete(stream, ctx); - } - - @Override - void beforeComplete(ProxyClient sc, ResponseHeader header) { - ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted"); - } - - Future<Void> result() { - return result.map(new AbstractFunction1<WriteResponse, Void>() { - @Override - public Void apply(WriteResponse v1) { - return null; - } - }); - } - } - - class CreateOp extends AbstractWriteOp { - - CreateOp(String name) { - super(name, clientStats.getOpStats("create")); - } - - @Override - Future<WriteResponse> sendWriteRequest(ProxyClient sc) { - return sc.getService().create(stream, ctx); - } - - @Override - void beforeComplete(ProxyClient sc, ResponseHeader header) { - ownershipCache.updateOwner(stream, sc.getAddress()); - } - - Future<Void> result() { - return result.map(new AbstractFunction1<WriteResponse, Void>() { - @Override - public Void apply(WriteResponse v1) { - return null; - } - }).voided(); - } - } - - class HeartbeatOp extends AbstractWriteOp { - HeartbeatOptions options; - - HeartbeatOp(String name, boolean sendReaderHeartBeat) { - super(name, clientStats.getOpStats("heartbeat")); - options = new HeartbeatOptions(); - options.setSendHeartBeatToReader(sendReaderHeartBeat); - } - - @Override - Future<WriteResponse> sendWriteRequest(ProxyClient sc) { - return sc.getService().heartbeatWithOptions(stream, ctx, options); - } - - Future<Void> result() { - return result.map(new AbstractFunction1<WriteResponse, Void>() { - @Override - public Void apply(WriteResponse response) { - return null; - } - }); - } - } - - // Stats - private final ClientStats clientStats; - - public DistributedLogClientImpl(String name, - ClientId clientId, - RoutingService routingService, - ClientBuilder clientBuilder, - ClientConfig clientConfig, - Optional<ClusterClient> clusterClient, - StatsReceiver statsReceiver, - StatsReceiver streamStatsReceiver, - RegionResolver regionResolver, - boolean enableRegionStats) { - this.clientName = name; - this.clientId = clientId; - this.routingService = routingService; - this.clientConfig = clientConfig; - this.streamFailfast = clientConfig.getStreamFailfast(); - this.streamNameRegexPattern = Pattern.compile(clientConfig.getStreamNameRegex()); - this.regionResolver = regionResolver; - // Build the timer - this.dlTimer = new HashedWheelTimer( - new ThreadFactoryBuilder().setNameFormat("DLClient-" + name + "-timer-%d").build(), - this.clientConfig.getRedirectBackoffStartMs(), - TimeUnit.MILLISECONDS); - // register routing listener - this.routingService.registerListener(this); - // build the ownership cache - this.ownershipCache = new OwnershipCache(this.clientConfig, this.dlTimer, statsReceiver, streamStatsReceiver); - // Client Stats - this.clientStats = new ClientStats(statsReceiver, enableRegionStats, regionResolver); - // Client Manager - this.clientBuilder = ProxyClient.newBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats); - this.clientManager = new ProxyClientManager( - this.clientConfig, // client config - this.clientBuilder, // client builder - this.dlTimer, // timer - this, // host provider - clientStats); // client stats - this.clusterClient = clusterClient; - this.clientManager.registerProxyListener(this); - - // Cache Stats - StatsReceiver cacheStatReceiver = statsReceiver.scope("cache"); - Seq<String> numCachedStreamsGaugeName = - scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_streams")).toList(); - cacheStatReceiver.provideGauge(numCachedStreamsGaugeName, new Function0<Object>() { - @Override - public Object apply() { - return (float) ownershipCache.getNumCachedStreams(); - } - }); - Seq<String> numCachedHostsGaugeName = - scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_hosts")).toList(); - cacheStatReceiver.provideGauge(numCachedHostsGaugeName, new Function0<Object>() { - @Override - public Object apply() { - return (float) clientManager.getNumProxies(); - } - }); - - logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {}," - + " stats_receiver = {}, thriftmux = {}", - new Object[] { - name, - clientId, - routingService.getClass(), - statsReceiver.getClass(), - clientConfig.getThriftMux() - }); - } - - @Override - public Set<SocketAddress> getHosts() { - Set<SocketAddress> hosts = Sets.newHashSet(); - // if using server side routing, we only handshake with the hosts in ownership cache. - if (!clusterClient.isPresent()) { - hosts.addAll(this.routingService.getHosts()); - } - hosts.addAll(this.ownershipCache.getStreamOwnershipDistribution().keySet()); - return hosts; - } - - @Override - public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { - if (null != serverInfo - && serverInfo.isSetServerStatus() - && ServerStatus.DOWN == serverInfo.getServerStatus()) { - logger.info("{} is detected as DOWN during handshaking", address); - // server is shutting down - handleServiceUnavailable(address, client, Optional.<StreamOp>absent()); - return; - } - - if (null != serverInfo && serverInfo.isSetOwnerships()) { - Map<String, String> ownerships = serverInfo.getOwnerships(); - logger.debug("Handshaked with {} : {} ownerships returned.", address, ownerships.size()); - for (Map.Entry<String, String> entry : ownerships.entrySet()) { - Matcher matcher = streamNameRegexPattern.matcher(entry.getKey()); - if (!matcher.matches()) { - continue; - } - updateOwnership(entry.getKey(), entry.getValue()); - } - } else { - logger.debug("Handshaked with {} : no ownerships returned", address); - } - } - - @Override - public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { - cause = showRootCause(Optional.<StreamOp>absent(), cause); - handleRequestException(address, client, Optional.<StreamOp>absent(), cause); - } - - @VisibleForTesting - public void handshake() { - clientManager.handshake(); - logger.info("Handshaked with {} hosts, cached {} streams", - clientManager.getNumProxies(), ownershipCache.getNumCachedStreams()); - } - - @Override - public void onServerLeft(SocketAddress address) { - onServerLeft(address, null); - } - - private void onServerLeft(SocketAddress address, ProxyClient sc) { - ownershipCache.removeAllStreamsFromOwner(address); - if (null == sc) { - clientManager.removeClient(address); - } else { - clientManager.removeClient(address, sc); - } - } - - @Override - public void onServerJoin(SocketAddress address) { - // we only pre-create connection for client-side routing - // if it is server side routing, we only know the exact proxy address - // when #getOwner. - if (!clusterClient.isPresent()) { - clientManager.createClient(address); - } - } - - public void close() { - closeLock.writeLock().lock(); - try { - if (closed) { - return; - } - closed = true; - } finally { - closeLock.writeLock().unlock(); - } - clientManager.close(); - routingService.unregisterListener(this); - routingService.stopService(); - dlTimer.stop(); - } - - @Override - public Future<Void> check(String stream) { - final HeartbeatOp op = new HeartbeatOp(stream, false); - sendRequest(op); - return op.result(); - } - - @Override - public Future<Void> heartbeat(String stream) { - final HeartbeatOp op = new HeartbeatOp(stream, true); - sendRequest(op); - return op.result(); - } - - @Override - public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() { - return ownershipCache.getStreamOwnershipDistribution(); - } - - @Override - public Future<Void> setAcceptNewStream(boolean enabled) { - Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients(); - List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size()); - for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) { - futures.add(entry.getValue().getService().setAcceptNewStream(enabled)); - } - return Future.collect(futures).map(new Function<List<Void>, Void>() { - @Override - public Void apply(List<Void> list) { - return null; - } - }); - } - - @Override - public Future<DLSN> write(String stream, ByteBuffer data) { - final WriteOp op = new WriteOp(stream, data); - sendRequest(op); - return op.result(); - } - - @Override - public Future<DLSN> writeRecordSet(String stream, final LogRecordSetBuffer recordSet) { - final WriteRecordSetOp op = new WriteRecordSetOp(stream, recordSet); - sendRequest(op); - return op.result(); - } - - @Override - public List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data) { - if (data.size() > 0) { - final BulkWriteOp op = new BulkWriteOp(stream, data); - sendRequest(op); - return op.result(); - } else { - return Collections.emptyList(); - } - } - - @Override - public Future<Boolean> truncate(String stream, DLSN dlsn) { - final TruncateOp op = new TruncateOp(stream, dlsn); - sendRequest(op); - return op.result(); - } - - @Override - public Future<Void> delete(String stream) { - final DeleteOp op = new DeleteOp(stream); - sendRequest(op); - return op.result(); - } - - @Override - public Future<Void> release(String stream) { - final ReleaseOp op = new ReleaseOp(stream); - sendRequest(op); - return op.result(); - } - - @Override - public Future<Void> create(String stream) { - final CreateOp op = new CreateOp(stream); - sendRequest(op); - return op.result(); - } - - private void sendRequest(final StreamOp op) { - closeLock.readLock().lock(); - try { - if (closed) { - op.fail(null, new DLClientClosedException("Client " + clientName + " is closed.")); - } else { - doSend(op, null); - } - } finally { - closeLock.readLock().unlock(); - } - } - - /** - * Send the stream operation by routing service, excluding previous address if it is not null. - * - * @param op - * stream operation. - * @param previousAddr - * previous tried address. - */ - private void doSend(final StreamOp op, final SocketAddress previousAddr) { - if (null != previousAddr) { - op.routingContext.addTriedHost(previousAddr, StatusCode.WRITE_EXCEPTION); - } - // Get host first - final SocketAddress address = ownershipCache.getOwner(op.stream); - if (null == address || op.routingContext.isTriedHost(address)) { - getOwner(op).addEventListener(new FutureEventListener<SocketAddress>() { - @Override - public void onFailure(Throwable cause) { - op.fail(null, cause); - } - - @Override - public void onSuccess(SocketAddress ownerAddr) { - op.send(ownerAddr); - } - }); - } else { - op.send(address); - } - } - - private void retryGetOwnerFromResourcePlacementServer(final StreamOp op, - final Promise<SocketAddress> getOwnerPromise, - final Throwable cause) { - if (op.shouldTimeout()) { - op.fail(null, cause); - return; - } - getOwnerFromResourcePlacementServer(op, getOwnerPromise); - } - - private void getOwnerFromResourcePlacementServer(final StreamOp op, - final Promise<SocketAddress> getOwnerPromise) { - clusterClient.get().getService().getOwner(op.stream, op.ctx) - .addEventListener(new FutureEventListener<WriteResponse>() { - @Override - public void onFailure(Throwable cause) { - getOwnerPromise.updateIfEmpty(new Throw<SocketAddress>(cause)); - } - - @Override - public void onSuccess(WriteResponse value) { - if (StatusCode.FOUND == value.getHeader().getCode() - && null != value.getHeader().getLocation()) { - try { - InetSocketAddress addr = DLSocketAddress.deserialize( - value.getHeader().getLocation() - ).getSocketAddress(); - getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr)); - } catch (IOException e) { - // retry from the routing server again - logger.error("ERROR in getOwner", e); - retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e); - return; - } - } else { - // retry from the routing server again - retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, - new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown")); - } - } - }); - } - - private Future<SocketAddress> getOwner(final StreamOp op) { - if (clusterClient.isPresent()) { - final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>(); - getOwnerFromResourcePlacementServer(op, getOwnerPromise); - return getOwnerPromise; - } - // pickup host by hashing - try { - return Future.value(routingService.getHost(op.stream, op.routingContext)); - } catch (NoBrokersAvailableException nbae) { - return Future.exception(nbae); - } - } - - private void sendWriteRequest(final SocketAddress addr, final StreamOp op) { - // Get corresponding finagle client - final ProxyClient sc = clientManager.getClient(addr); - final long startTimeNanos = System.nanoTime(); - // write the request to that host. - op.sendRequest(sc).addEventListener(new FutureEventListener<ResponseHeader>() { - @Override - public void onSuccess(ResponseHeader header) { - if (logger.isDebugEnabled()) { - logger.debug("Received response; header: {}", header); - } - clientStats.completeProxyRequest(addr, header.getCode(), startTimeNanos); - // update routing context - op.routingContext.addTriedHost(addr, header.getCode()); - switch (header.getCode()) { - case SUCCESS: - // success handling is done per stream op - break; - case FOUND: - handleRedirectResponse(header, op, addr); - break; - // for overcapacity, dont report failure since this normally happens quite a bit - case OVER_CAPACITY: - logger.debug("Failed to write request to {} : {}", op.stream, header); - op.fail(addr, DLException.of(header)); - break; - // for responses that indicate the requests definitely failed, - // we should fail them immediately (e.g. TOO_LARGE_RECORD, METADATA_EXCEPTION) - case NOT_IMPLEMENTED: - case METADATA_EXCEPTION: - case LOG_EMPTY: - case LOG_NOT_FOUND: - case TRUNCATED_TRANSACTION: - case END_OF_STREAM: - case TRANSACTION_OUT_OF_ORDER: - case INVALID_STREAM_NAME: - case REQUEST_DENIED: - case TOO_LARGE_RECORD: - case CHECKSUM_FAILED: - // status code NOT_READY is returned if failfast is enabled in the server. don't redirect - // since the proxy may still own the stream. - case STREAM_NOT_READY: - op.fail(addr, DLException.of(header)); - break; - case SERVICE_UNAVAILABLE: - handleServiceUnavailable(addr, sc, Optional.of(op)); - break; - case REGION_UNAVAILABLE: - // region is unavailable, redirect the request to hosts in other region - redirect(op, null); - break; - // Proxy was overloaded and refused to try to acquire the stream. Don't remove ownership, since - // we didn't have it in the first place. - case TOO_MANY_STREAMS: - handleRedirectableError(addr, op, header); - break; - case STREAM_UNAVAILABLE: - case ZOOKEEPER_ERROR: - case LOCKING_EXCEPTION: - case UNEXPECTED: - case INTERRUPTED: - case BK_TRANSMIT_ERROR: - case FLUSH_TIMEOUT: - default: - // when we are receiving these exceptions from proxy, it means proxy or the stream is closed - // redirect the request. - ownershipCache.removeOwnerFromStream(op.stream, addr, header.getCode().name()); - handleRedirectableError(addr, op, header); - break; - } - } - - @Override - public void onFailure(Throwable cause) { - Optional<StreamOp> opOptional = Optional.of(op); - cause = showRootCause(opOptional, cause); - clientStats.failProxyRequest(addr, cause, startTimeNanos); - handleRequestException(addr, sc, opOptional, cause); - } - }); - } - - // Response Handlers - - Throwable showRootCause(Optional<StreamOp> op, Throwable cause) { - if (cause instanceof Failure) { - Failure failure = (Failure) cause; - if (failure.isFlagged(Failure.Wrapped())) { - try { - // if it is a wrapped failure, unwrap it first - cause = failure.show(); - } catch (IllegalArgumentException iae) { - if (op.isPresent()) { - logger.warn("Failed to unwrap finagle failure of stream {} : ", op.get().stream, iae); - } else { - logger.warn("Failed to unwrap finagle failure : ", iae); - } - } - } - } - return cause; - } - - private void handleRedirectableError(SocketAddress addr, - StreamOp op, - ResponseHeader header) { - if (streamFailfast) { - op.fail(addr, DLException.of(header)); - } else { - redirect(op, null); - } - } - - void handleServiceUnavailable(SocketAddress addr, - ProxyClient sc, - Optional<StreamOp> op) { - // service is unavailable, remove it out of routing service - routingService.removeHost(addr, new ServiceUnavailableException(addr + " is unavailable now.")); - onServerLeft(addr); - if (op.isPresent()) { - ownershipCache.removeOwnerFromStream(op.get().stream, addr, addr + " is unavailable now."); - // redirect the request to other host. - redirect(op.get(), null); - } - } - - void handleRequestException(SocketAddress addr, - ProxyClient sc, - Optional<StreamOp> op, - Throwable cause) { - boolean resendOp = false; - boolean removeOwnerFromStream = false; - SocketAddress previousAddr = addr; - String reason = cause.getMessage(); - if (cause instanceof ConnectionFailedException || cause instanceof java.net.ConnectException) { - routingService.removeHost(addr, cause); - onServerLeft(addr, sc); - removeOwnerFromStream = true; - // redirect the request to other host. - resendOp = true; - } else if (cause instanceof ChannelException) { - // java.net.ConnectException typically means connection is refused remotely - // no process listening on remote address/port. - if (cause.getCause() instanceof java.net.ConnectException) { - routingService.removeHost(addr, cause.getCause()); - onServerLeft(addr); - reason = cause.getCause().getMessage(); - } else { - routingService.removeHost(addr, cause); - reason = cause.getMessage(); - } - removeOwnerFromStream = true; - // redirect the request to other host. - resendOp = true; - } else if (cause instanceof ServiceTimeoutException) { - // redirect the request to itself again, which will backoff for a while - resendOp = true; - previousAddr = null; - } else if (cause instanceof WriteException) { - // redirect the request to other host. - resendOp = true; - } else if (cause instanceof ServiceException) { - // redirect the request to other host. - clientManager.removeClient(addr, sc); - resendOp = true; - } else if (cause instanceof TApplicationException) { - handleTApplicationException(cause, op, addr, sc); - } else if (cause instanceof Failure) { - handleFinagleFailure((Failure) cause, op, addr); - } else { - // Default handler - handleException(cause, op, addr); - } - - if (op.isPresent()) { - if (removeOwnerFromStream) { - ownershipCache.removeOwnerFromStream(op.get().stream, addr, reason); - } - if (resendOp) { - doSend(op.get(), previousAddr); - } - } - } - - /** - * Redirect the request to new proxy <i>newAddr</i>. If <i>newAddr</i> is null, - * it would pick up a host from routing service. - * - * @param op - * stream operation - * @param newAddr - * new proxy address - */ - void redirect(StreamOp op, SocketAddress newAddr) { - ownershipCache.getOwnershipStatsLogger().onRedirect(op.stream); - if (null != newAddr) { - logger.debug("Redirect request {} to new owner {}.", op, newAddr); - op.send(newAddr); - } else { - doSend(op, null); - } - } - - void handleFinagleFailure(Failure failure, - Optional<StreamOp> op, - SocketAddress addr) { - if (failure.isFlagged(Failure.Restartable())) { - if (op.isPresent()) { - // redirect the request to other host - doSend(op.get(), addr); - } - } else { - // fail the request if it is other types of failures - handleException(failure, op, addr); - } - } - - void handleException(Throwable cause, - Optional<StreamOp> op, - SocketAddress addr) { - // RequestTimeoutException: fail it and let client decide whether to retry or not. - - // FailedFastException: - // We don't actually know when FailedFastException will be thrown - // so properly we just throw it back to application to let application - // handle it. - - // Other Exceptions: as we don't know how to handle them properly so throw them to client - if (op.isPresent()) { - logger.error("Failed to write request to {} @ {} : {}", - new Object[]{op.get().stream, addr, cause.toString()}); - op.get().fail(addr, cause); - } - } - - void handleTApplicationException(Throwable cause, - Optional<StreamOp> op, - SocketAddress addr, - ProxyClient sc) { - TApplicationException ex = (TApplicationException) cause; - if (ex.getType() == TApplicationException.UNKNOWN_METHOD) { - // if we encountered unknown method exception on thrift server, it means this proxy - // has problem. we should remove it from routing service, clean up ownerships - routingService.removeHost(addr, cause); - onServerLeft(addr, sc); - if (op.isPresent()) { - ownershipCache.removeOwnerFromStream(op.get().stream, addr, cause.getMessage()); - doSend(op.get(), addr); - } - } else { - handleException(cause, op, addr); - } - } - - void handleRedirectResponse(ResponseHeader header, StreamOp op, SocketAddress curAddr) { - SocketAddress ownerAddr = null; - if (header.isSetLocation()) { - String owner = header.getLocation(); - try { - ownerAddr = DLSocketAddress.deserialize(owner).getSocketAddress(); - // if we are receiving a direct request to same host, we won't try the same host. - // as the proxy will shut itself down if it redirects client to itself. - if (curAddr.equals(ownerAddr)) { - logger.warn("Request to stream {} is redirected to same server {}!", op.stream, curAddr); - ownerAddr = null; - } else { - // update ownership when redirects. - ownershipCache.updateOwner(op.stream, ownerAddr); - } - } catch (IOException e) { - ownerAddr = null; - } - } - redirect(op, ownerAddr); - } - - void updateOwnership(String stream, String location) { - try { - SocketAddress ownerAddr = DLSocketAddress.deserialize(location).getSocketAddress(); - // update ownership - ownershipCache.updateOwner(stream, ownerAddr); - } catch (IOException e) { - logger.warn("Invalid ownership {} found for stream {} : ", - new Object[] { location, stream, e }); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java deleted file mode 100644 index 8ccbbfc..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java +++ /dev/null @@ -1,486 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE; -import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; - -import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.LogRecordSet; -import com.twitter.distributedlog.LogRecordSetBuffer; -import com.twitter.distributedlog.client.speculative.DefaultSpeculativeRequestExecutionPolicy; -import com.twitter.distributedlog.client.speculative.SpeculativeRequestExecutionPolicy; -import com.twitter.distributedlog.client.speculative.SpeculativeRequestExecutor; -import com.twitter.distributedlog.exceptions.LogRecordTooLongException; -import com.twitter.distributedlog.exceptions.WriteException; -import com.twitter.distributedlog.io.CompressionCodec; -import com.twitter.distributedlog.service.DistributedLogClient; -import com.twitter.finagle.IndividualRequestTimeoutException; -import com.twitter.util.Duration; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Write to multiple streams. - */ -public class DistributedLogMultiStreamWriter implements Runnable { - - /** - * Create a new builder to create a multi stream writer. - * - * @return a new builder to create a multi stream writer. - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder for the multi stream writer. - */ - public static class Builder { - - private DistributedLogClient client = null; - private List<String> streams = null; - private int bufferSize = 16 * 1024; // 16k - private long flushIntervalMicros = 2000; // 2ms - private CompressionCodec.Type codec = CompressionCodec.Type.NONE; - private ScheduledExecutorService executorService = null; - private long requestTimeoutMs = 500; // 500ms - private int firstSpeculativeTimeoutMs = 50; // 50ms - private int maxSpeculativeTimeoutMs = 200; // 200ms - private float speculativeBackoffMultiplier = 2; - private Ticker ticker = Ticker.systemTicker(); - - private Builder() {} - - /** - * Set the distributedlog client used for multi stream writer. - * - * @param client - * distributedlog client - * @return builder - */ - public Builder client(DistributedLogClient client) { - this.client = client; - return this; - } - - /** - * Set the list of streams to write to. - * - * @param streams - * list of streams to write - * @return builder - */ - public Builder streams(List<String> streams) { - this.streams = streams; - return this; - } - - /** - * Set the output buffer size. - * - * <p>If output buffer size is 0, the writes will be transmitted to - * wire immediately. - * - * @param bufferSize - * output buffer size - * @return builder - */ - public Builder bufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - /** - * Set the flush interval in milliseconds. - * - * @param flushIntervalMs - * flush interval in milliseconds. - * @return builder - */ - public Builder flushIntervalMs(int flushIntervalMs) { - this.flushIntervalMicros = TimeUnit.MILLISECONDS.toMicros(flushIntervalMs); - return this; - } - - /** - * Set the flush interval in microseconds. - * - * @param flushIntervalMicros - * flush interval in microseconds. - * @return builder - */ - public Builder flushIntervalMicros(int flushIntervalMicros) { - this.flushIntervalMicros = flushIntervalMicros; - return this; - } - - /** - * Set compression codec. - * - * @param codec compression codec. - * @return builder - */ - public Builder compressionCodec(CompressionCodec.Type codec) { - this.codec = codec; - return this; - } - - /** - * Set the scheduler to flush output buffers. - * - * @param executorService - * executor service to flush output buffers. - * @return builder - */ - public Builder scheduler(ScheduledExecutorService executorService) { - this.executorService = executorService; - return this; - } - - /** - * Set request timeout in milliseconds. - * - * @param requestTimeoutMs - * request timeout in milliseconds. - * @return builder - */ - public Builder requestTimeoutMs(long requestTimeoutMs) { - this.requestTimeoutMs = requestTimeoutMs; - return this; - } - - /** - * Set the first speculative timeout in milliseconds. - * - * <p>The multi-streams writer does speculative writes on streams. - * The write issues first write request to a stream, if the write request - * doesn't respond within speculative timeout. it issues next write request - * to a different stream. It does such speculative retries until receive - * a success or request timeout ({@link #requestTimeoutMs(long)}). - * - * <p>This setting is to configure the first speculative timeout, in milliseconds. - * - * @param timeoutMs - * timeout in milliseconds - * @return builder - */ - public Builder firstSpeculativeTimeoutMs(int timeoutMs) { - this.firstSpeculativeTimeoutMs = timeoutMs; - return this; - } - - /** - * Set the max speculative timeout in milliseconds. - * - * <p>The multi-streams writer does speculative writes on streams. - * The write issues first write request to a stream, if the write request - * doesn't respond within speculative timeout. it issues next write request - * to a different stream. It does such speculative retries until receive - * a success or request timeout ({@link #requestTimeoutMs(long)}). - * - * <p>This setting is to configure the max speculative timeout, in milliseconds. - * - * @param timeoutMs - * timeout in milliseconds - * @return builder - */ - public Builder maxSpeculativeTimeoutMs(int timeoutMs) { - this.maxSpeculativeTimeoutMs = timeoutMs; - return this; - } - - /** - * Set the speculative timeout backoff multiplier. - * - * <p>The multi-streams writer does speculative writes on streams. - * The write issues first write request to a stream, if the write request - * doesn't respond within speculative timeout. it issues next write request - * to a different stream. It does such speculative retries until receive - * a success or request timeout ({@link #requestTimeoutMs(long)}). - * - * <p>This setting is to configure the speculative timeout backoff multiplier. - * - * @param multiplier - * backoff multiplier - * @return builder - */ - public Builder speculativeBackoffMultiplier(float multiplier) { - this.speculativeBackoffMultiplier = multiplier; - return this; - } - - /** - * Ticker for timing. - * - * @param ticker - * ticker - * @return builder - * @see Ticker - */ - public Builder clockTicker(Ticker ticker) { - this.ticker = ticker; - return this; - } - - /** - * Build the multi stream writer. - * - * @return the multi stream writer. - */ - public DistributedLogMultiStreamWriter build() { - checkArgument((null != streams && !streams.isEmpty()), - "No streams provided"); - checkNotNull(client, - "No distributedlog client provided"); - checkNotNull(codec, - "No compression codec provided"); - checkArgument(firstSpeculativeTimeoutMs > 0 - && firstSpeculativeTimeoutMs <= maxSpeculativeTimeoutMs - && speculativeBackoffMultiplier > 0 - && maxSpeculativeTimeoutMs < requestTimeoutMs, - "Invalid speculative timeout settings"); - return new DistributedLogMultiStreamWriter( - streams, - client, - Math.min(bufferSize, MAX_LOGRECORDSET_SIZE), - flushIntervalMicros, - requestTimeoutMs, - firstSpeculativeTimeoutMs, - maxSpeculativeTimeoutMs, - speculativeBackoffMultiplier, - codec, - ticker, - executorService); - } - } - - /** - * Pending Write Request. - */ - class PendingWriteRequest implements FutureEventListener<DLSN>, - SpeculativeRequestExecutor { - - private final LogRecordSetBuffer recordSet; - private AtomicBoolean complete = new AtomicBoolean(false); - private final Stopwatch stopwatch = Stopwatch.createStarted(clockTicker); - private int nextStream; - private int numTriedStreams = 0; - - PendingWriteRequest(LogRecordSetBuffer recordSet) { - this.recordSet = recordSet; - this.nextStream = Math.abs(nextStreamId.incrementAndGet()) % numStreams; - } - - synchronized String sendNextWrite() { - long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); - if (elapsedMs > requestTimeoutMs || numTriedStreams >= numStreams) { - fail(new IndividualRequestTimeoutException(Duration.fromMilliseconds(elapsedMs))); - return null; - } - try { - return sendWriteToStream(nextStream); - } finally { - nextStream = (nextStream + 1) % numStreams; - ++numTriedStreams; - } - } - - synchronized String sendWriteToStream(int streamId) { - String stream = getStream(streamId); - client.writeRecordSet(stream, recordSet) - .addEventListener(this); - return stream; - } - - @Override - public void onSuccess(DLSN dlsn) { - if (!complete.compareAndSet(false, true)) { - return; - } - recordSet.completeTransmit( - dlsn.getLogSegmentSequenceNo(), - dlsn.getEntryId(), - dlsn.getSlotId()); - } - - @Override - public void onFailure(Throwable cause) { - sendNextWrite(); - } - - private void fail(Throwable cause) { - if (!complete.compareAndSet(false, true)) { - return; - } - recordSet.abortTransmit(cause); - } - - @Override - public Future<Boolean> issueSpeculativeRequest() { - return Future.value(!complete.get() && null != sendNextWrite()); - } - } - - private final int numStreams; - private final List<String> streams; - private final DistributedLogClient client; - private final int bufferSize; - private final long requestTimeoutMs; - private final SpeculativeRequestExecutionPolicy speculativePolicy; - private final Ticker clockTicker; - private final CompressionCodec.Type codec; - private final ScheduledExecutorService scheduler; - private final boolean ownScheduler; - private final AtomicInteger nextStreamId; - private LogRecordSet.Writer recordSetWriter; - - private DistributedLogMultiStreamWriter(List<String> streams, - DistributedLogClient client, - int bufferSize, - long flushIntervalMicros, - long requestTimeoutMs, - int firstSpecultiveTimeoutMs, - int maxSpeculativeTimeoutMs, - float speculativeBackoffMultiplier, - CompressionCodec.Type codec, - Ticker clockTicker, - ScheduledExecutorService scheduler) { - this.streams = Lists.newArrayList(streams); - this.numStreams = this.streams.size(); - this.client = client; - this.bufferSize = bufferSize; - this.requestTimeoutMs = requestTimeoutMs; - this.codec = codec; - this.clockTicker = clockTicker; - if (null == scheduler) { - this.scheduler = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("MultiStreamWriterFlushThread-%d") - .build()); - this.ownScheduler = true; - } else { - this.scheduler = scheduler; - this.ownScheduler = false; - } - this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy( - firstSpecultiveTimeoutMs, - maxSpeculativeTimeoutMs, - speculativeBackoffMultiplier); - // shuffle the streams - Collections.shuffle(this.streams); - this.nextStreamId = new AtomicInteger(0); - this.recordSetWriter = newRecordSetWriter(); - - if (flushIntervalMicros > 0) { - this.scheduler.scheduleAtFixedRate( - this, - flushIntervalMicros, - flushIntervalMicros, - TimeUnit.MICROSECONDS); - } - } - - String getStream(int streamId) { - return streams.get(streamId); - } - - synchronized LogRecordSet.Writer getLogRecordSetWriter() { - return recordSetWriter; - } - - private LogRecordSet.Writer newRecordSetWriter() { - return LogRecordSet.newWriter( - bufferSize, - codec); - } - - public synchronized Future<DLSN> write(ByteBuffer buffer) { - int logRecordSize = buffer.remaining(); - if (logRecordSize > MAX_LOGRECORD_SIZE) { - return Future.exception(new LogRecordTooLongException( - "Log record of size " + logRecordSize + " written when only " - + MAX_LOGRECORD_SIZE + " is allowed")); - } - // if exceed max number of bytes - if ((recordSetWriter.getNumBytes() + logRecordSize) > MAX_LOGRECORDSET_SIZE) { - flush(); - } - Promise<DLSN> writePromise = new Promise<DLSN>(); - try { - recordSetWriter.writeRecord(buffer, writePromise); - } catch (LogRecordTooLongException e) { - return Future.exception(e); - } catch (WriteException e) { - recordSetWriter.abortTransmit(e); - recordSetWriter = newRecordSetWriter(); - return Future.exception(e); - } - if (recordSetWriter.getNumBytes() >= bufferSize) { - flush(); - } - return writePromise; - } - - @Override - public void run() { - flush(); - } - - private void flush() { - LogRecordSet.Writer recordSetToFlush; - synchronized (this) { - if (recordSetWriter.getNumRecords() == 0) { - return; - } - recordSetToFlush = recordSetWriter; - recordSetWriter = newRecordSetWriter(); - } - transmit(recordSetToFlush); - } - - private void transmit(LogRecordSet.Writer recordSetToFlush) { - PendingWriteRequest writeRequest = - new PendingWriteRequest(recordSetToFlush); - this.speculativePolicy.initiateSpeculativeRequest(scheduler, writeRequest); - } - - public void close() { - if (ownScheduler) { - this.scheduler.shutdown(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java deleted file mode 100644 index e541578..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.monitor; - -import com.twitter.util.Future; -import java.net.SocketAddress; -import java.util.Map; -import java.util.Set; - -/** - * Interface for distributedlog monitor service. - */ -public interface MonitorServiceClient { - - /** - * Check a given stream. - * - * @param stream - * stream. - * @return check result. - */ - Future<Void> check(String stream); - - /** - * Send heartbeat to the stream and its readers. - * - * @param stream - * stream. - * @return check result. - */ - Future<Void> heartbeat(String stream); - - /** - * Get current ownership distribution from current monitor service view. - * - * @return current ownership distribution - */ - Map<SocketAddress, Set<String>> getStreamOwnershipDistribution(); - - /** - * Enable/Disable accepting new stream on a given proxy. - * - * @param enabled - * flag to enable/disable accepting new streams on a given proxy - * @return void - */ - Future<Void> setAcceptNewStream(boolean enabled); - - /** - * Close the client. - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java deleted file mode 100644 index c4e7df0..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * DistributedLog Monitor Client. - */ -package com.twitter.distributedlog.client.monitor; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java deleted file mode 100644 index 387d727..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.ownership; - -import com.google.common.collect.ImmutableMap; -import com.twitter.distributedlog.client.ClientConfig; -import com.twitter.distributedlog.client.stats.OwnershipStatsLogger; -import com.twitter.finagle.stats.StatsReceiver; -import java.net.SocketAddress; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Client Side Ownership Cache. - */ -public class OwnershipCache implements TimerTask { - - private static final Logger logger = LoggerFactory.getLogger(OwnershipCache.class); - - private final ConcurrentHashMap<String, SocketAddress> stream2Addresses = - new ConcurrentHashMap<String, SocketAddress>(); - private final ConcurrentHashMap<SocketAddress, Set<String>> address2Streams = - new ConcurrentHashMap<SocketAddress, Set<String>>(); - private final ClientConfig clientConfig; - private final HashedWheelTimer timer; - - // Stats - private final OwnershipStatsLogger ownershipStatsLogger; - - public OwnershipCache(ClientConfig clientConfig, - HashedWheelTimer timer, - StatsReceiver statsReceiver, - StatsReceiver streamStatsReceiver) { - this.clientConfig = clientConfig; - this.timer = timer; - this.ownershipStatsLogger = new OwnershipStatsLogger(statsReceiver, streamStatsReceiver); - scheduleDumpOwnershipCache(); - } - - private void scheduleDumpOwnershipCache() { - if (clientConfig.isPeriodicDumpOwnershipCacheEnabled() - && clientConfig.getPeriodicDumpOwnershipCacheIntervalMs() > 0) { - timer.newTimeout(this, clientConfig.getPeriodicDumpOwnershipCacheIntervalMs(), - TimeUnit.MILLISECONDS); - } - } - - @Override - public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled()) { - return; - } - logger.info("Ownership cache : {} streams cached, {} hosts cached", - stream2Addresses.size(), address2Streams.size()); - logger.info("Cached streams : {}", stream2Addresses); - scheduleDumpOwnershipCache(); - } - - public OwnershipStatsLogger getOwnershipStatsLogger() { - return ownershipStatsLogger; - } - - /** - * Update ownership of <i>stream</i> to <i>addr</i>. - * - * @param stream - * Stream Name. - * @param addr - * Owner Address. - * @return true if owner is updated - */ - public boolean updateOwner(String stream, SocketAddress addr) { - // update ownership - SocketAddress oldAddr = stream2Addresses.putIfAbsent(stream, addr); - if (null != oldAddr && oldAddr.equals(addr)) { - return true; - } - if (null != oldAddr) { - if (stream2Addresses.replace(stream, oldAddr, addr)) { - // Store the relevant mappings for this topic and host combination - logger.info("Storing ownership for stream : {}, old host : {}, new host : {}.", - new Object[] { stream, oldAddr, addr }); - StringBuilder sb = new StringBuilder(); - sb.append("Ownership changed '") - .append(oldAddr).append("' -> '").append(addr).append("'"); - removeOwnerFromStream(stream, oldAddr, sb.toString()); - - // update stats - ownershipStatsLogger.onRemove(stream); - ownershipStatsLogger.onAdd(stream); - } else { - logger.warn("Ownership of stream : {} has been changed from {} to {} when storing host : {}.", - new Object[] { stream, oldAddr, stream2Addresses.get(stream), addr }); - return false; - } - } else { - logger.info("Storing ownership for stream : {}, host : {}.", stream, addr); - // update stats - ownershipStatsLogger.onAdd(stream); - } - - Set<String> streamsForHost = address2Streams.get(addr); - if (null == streamsForHost) { - Set<String> newStreamsForHost = new HashSet<String>(); - streamsForHost = address2Streams.putIfAbsent(addr, newStreamsForHost); - if (null == streamsForHost) { - streamsForHost = newStreamsForHost; - } - } - synchronized (streamsForHost) { - // check whether the ownership changed, since it might happend after replace succeed - if (addr.equals(stream2Addresses.get(stream))) { - streamsForHost.add(stream); - } - } - return true; - } - - /** - * Get the cached owner for stream <code>stream</code>. - * - * @param stream - * stream to lookup ownership - * @return owner's address - */ - public SocketAddress getOwner(String stream) { - SocketAddress address = stream2Addresses.get(stream); - if (null == address) { - ownershipStatsLogger.onMiss(stream); - } else { - ownershipStatsLogger.onHit(stream); - } - return address; - } - - /** - * Remove the owner <code>addr</code> from <code>stream</code> for a given <code>reason</code>. - * - * @param stream stream name - * @param addr owner address - * @param reason reason to remove ownership - */ - public void removeOwnerFromStream(String stream, SocketAddress addr, String reason) { - if (stream2Addresses.remove(stream, addr)) { - logger.info("Removed stream to host mapping for (stream: {} -> host: {}) : reason = '{}'.", - new Object[] { stream, addr, reason }); - } - Set<String> streamsForHost = address2Streams.get(addr); - if (null != streamsForHost) { - synchronized (streamsForHost) { - if (streamsForHost.remove(stream)) { - logger.info("Removed stream ({}) from host {} : reason = '{}'.", - new Object[] { stream, addr, reason }); - if (streamsForHost.isEmpty()) { - address2Streams.remove(addr, streamsForHost); - } - ownershipStatsLogger.onRemove(stream); - } - } - } - } - - /** - * Remove all streams from host <code>addr</code>. - * - * @param addr - * host to remove ownerships - */ - public void removeAllStreamsFromOwner(SocketAddress addr) { - logger.info("Remove streams mapping for host {}", addr); - Set<String> streamsForHost = address2Streams.get(addr); - if (null != streamsForHost) { - synchronized (streamsForHost) { - for (String s : streamsForHost) { - if (stream2Addresses.remove(s, addr)) { - logger.info("Removing mapping for stream : {} from host : {}", s, addr); - ownershipStatsLogger.onRemove(s); - } - } - address2Streams.remove(addr, streamsForHost); - } - } - } - - /** - * Get the number cached streams. - * - * @return number cached streams. - */ - public int getNumCachedStreams() { - return stream2Addresses.size(); - } - - /** - * Get the stream ownership distribution across proxies. - * - * @return stream ownership distribution - */ - public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() { - return ImmutableMap.copyOf(address2Streams); - } - - /** - * Get the stream ownership mapping. - * - * @return stream ownership mapping. - */ - public Map<String, SocketAddress> getStreamOwnerMapping() { - return stream2Addresses; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java deleted file mode 100644 index 721702e..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Utils for managing ownership at client side. - */ -package com.twitter.distributedlog.client.ownership; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java deleted file mode 100644 index aa167fb..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * DistributedLog Client. - */ -package com.twitter.distributedlog.client; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java deleted file mode 100644 index f8bdae7..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.proxy; - -import com.twitter.distributedlog.thrift.service.DistributedLogService; -import com.twitter.finagle.Service; -import com.twitter.finagle.thrift.ThriftClientRequest; -import com.twitter.util.Future; -import scala.runtime.BoxedUnit; - -/** - * Cluster client. - */ -public class ClusterClient { - - private final Service<ThriftClientRequest, byte[]> client; - private final DistributedLogService.ServiceIface service; - - public ClusterClient(Service<ThriftClientRequest, byte[]> client, - DistributedLogService.ServiceIface service) { - this.client = client; - this.service = service; - } - - public Service<ThriftClientRequest, byte[]> getClient() { - return client; - } - - public DistributedLogService.ServiceIface getService() { - return service; - } - - public Future<BoxedUnit> close() { - return client.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/HostProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/HostProvider.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/HostProvider.java deleted file mode 100644 index 4878c1c..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/HostProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.proxy; - -import java.net.SocketAddress; -import java.util.Set; - -/** - * Provider to provider list of hosts for handshaking. - */ -public interface HostProvider { - - /** - * Get the list of hosts for handshaking. - * - * @return list of hosts for handshaking. - */ - Set<SocketAddress> getHosts(); - -}