http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java deleted file mode 100644 index 3f65aff..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java +++ /dev/null @@ -1,608 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.service; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.twitter.common.zookeeper.ServerSet; -import com.twitter.distributedlog.client.ClientConfig; -import com.twitter.distributedlog.client.DistributedLogClientImpl; -import com.twitter.distributedlog.client.monitor.MonitorServiceClient; -import com.twitter.distributedlog.client.proxy.ClusterClient; -import com.twitter.distributedlog.client.resolver.DefaultRegionResolver; -import com.twitter.distributedlog.client.resolver.RegionResolver; -import com.twitter.distributedlog.client.routing.RegionsRoutingService; -import com.twitter.distributedlog.client.routing.RoutingService; -import com.twitter.distributedlog.client.routing.RoutingUtils; -import com.twitter.distributedlog.thrift.service.DistributedLogService; -import com.twitter.finagle.Name; -import com.twitter.finagle.Resolver$; -import com.twitter.finagle.Service; -import com.twitter.finagle.ThriftMux; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.stats.NullStatsReceiver; -import com.twitter.finagle.stats.StatsReceiver; -import com.twitter.finagle.thrift.ClientId; -import com.twitter.finagle.thrift.ThriftClientFramedCodec; -import com.twitter.finagle.thrift.ThriftClientRequest; -import com.twitter.util.Duration; -import java.net.SocketAddress; -import java.net.URI; -import java.util.Random; -import org.apache.commons.lang.StringUtils; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; - -/** - * Builder to build {@link DistributedLogClient}. - */ -public final class DistributedLogClientBuilder { - - private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientBuilder.class); - - private static final Random random = new Random(System.currentTimeMillis()); - - private String name = null; - private ClientId clientId = null; - private RoutingService.Builder routingServiceBuilder = null; - private ClientBuilder clientBuilder = null; - private String serverRoutingServiceFinagleName = null; - private StatsReceiver statsReceiver = new NullStatsReceiver(); - private StatsReceiver streamStatsReceiver = new NullStatsReceiver(); - private ClientConfig clientConfig = new ClientConfig(); - private boolean enableRegionStats = false; - private final RegionResolver regionResolver = new DefaultRegionResolver(); - - /** - * Create a client builder. - * - * @return client builder - */ - public static DistributedLogClientBuilder newBuilder() { - return new DistributedLogClientBuilder(); - } - - /** - * Create a new client builder from an existing {@code builder}. - * - * @param builder the existing builder. - * @return a new client builder. - */ - public static DistributedLogClientBuilder newBuilder(DistributedLogClientBuilder builder) { - DistributedLogClientBuilder newBuilder = new DistributedLogClientBuilder(); - newBuilder.name = builder.name; - newBuilder.clientId = builder.clientId; - newBuilder.clientBuilder = builder.clientBuilder; - newBuilder.routingServiceBuilder = builder.routingServiceBuilder; - newBuilder.statsReceiver = builder.statsReceiver; - newBuilder.streamStatsReceiver = builder.streamStatsReceiver; - newBuilder.enableRegionStats = builder.enableRegionStats; - newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName; - newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig); - return newBuilder; - } - - // private constructor - private DistributedLogClientBuilder() {} - - /** - * Client Name. - * - * @param name - * client name - * @return client builder. - */ - public DistributedLogClientBuilder name(String name) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.name = name; - return newBuilder; - } - - /** - * Client ID. - * - * @param clientId - * client id - * @return client builder. - */ - public DistributedLogClientBuilder clientId(ClientId clientId) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientId = clientId; - return newBuilder; - } - - /** - * Serverset to access proxy services. - * - * @param serverSet - * server set. - * @return client builder. - */ - public DistributedLogClientBuilder serverSet(ServerSet serverSet) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(serverSet); - newBuilder.enableRegionStats = false; - return newBuilder; - } - - /** - * Server Sets to access proxy services. - * - * <p>The <i>local</i> server set will be tried first then <i>remotes</i>. - * - * @param local local server set. - * @param remotes remote server sets. - * @return client builder. - */ - public DistributedLogClientBuilder serverSets(ServerSet local, ServerSet...remotes) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1]; - builders[0] = RoutingUtils.buildRoutingService(local); - for (int i = 1; i < builders.length; i++) { - builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]); - } - newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder() - .resolver(regionResolver) - .routingServiceBuilders(builders); - newBuilder.enableRegionStats = remotes.length > 0; - return newBuilder; - } - - /** - * Name to access proxy services. - * - * @param finagleNameStr - * finagle name string. - * @return client builder. - */ - public DistributedLogClientBuilder finagleNameStr(String finagleNameStr) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr); - newBuilder.enableRegionStats = false; - return newBuilder; - } - - /** - * Finagle name strs to access proxy services. - * - * <p>The <i>local</i> finalge name str will be tried first, then <i>remotes</i>. - * - * @param local local server set. - * @param remotes remote server sets. - * @return client builder. - */ - public DistributedLogClientBuilder finagleNameStrs(String local, String...remotes) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1]; - builders[0] = RoutingUtils.buildRoutingService(local); - for (int i = 1; i < builders.length; i++) { - builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]); - } - newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder() - .routingServiceBuilders(builders) - .resolver(regionResolver); - newBuilder.enableRegionStats = remotes.length > 0; - return newBuilder; - } - - /** - * URI to access proxy services. - * - * <p>Assuming the write proxies are announced under `.write_proxy` of the provided namespace uri. - * The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to - * zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`) - * - * @param uri namespace uri to access the serverset of write proxies - * @return distributedlog builder - */ - public DistributedLogClientBuilder uri(URI uri) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - String zkServers = uri.getAuthority().replace(";", ","); - String[] zkServerList = StringUtils.split(zkServers, ','); - String finagleNameStr = String.format( - "zk!%s!%s/.write_proxy", - zkServerList[random.nextInt(zkServerList.length)], // zk server - uri.getPath()); - newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr); - newBuilder.enableRegionStats = false; - return newBuilder; - } - - /** - * Address of write proxy to connect. - * - * @param address - * write proxy address. - * @return client builder. - */ - public DistributedLogClientBuilder host(SocketAddress address) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(address); - newBuilder.enableRegionStats = false; - return newBuilder; - } - - private DistributedLogClientBuilder routingServiceBuilder(RoutingService.Builder builder) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.routingServiceBuilder = builder; - newBuilder.enableRegionStats = false; - return newBuilder; - } - - /** - * Routing Service to access proxy services. - * - * @param routingService - * routing service - * @return client builder. - */ - @VisibleForTesting - public DistributedLogClientBuilder routingService(RoutingService routingService) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(routingService); - newBuilder.enableRegionStats = false; - return newBuilder; - } - - /** - * Stats receiver to expose client stats. - * - * @param statsReceiver - * stats receiver. - * @return client builder. - */ - public DistributedLogClientBuilder statsReceiver(StatsReceiver statsReceiver) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.statsReceiver = statsReceiver; - return newBuilder; - } - - /** - * Stream Stats Receiver to expose per stream stats. - * - * @param streamStatsReceiver - * stream stats receiver - * @return client builder. - */ - public DistributedLogClientBuilder streamStatsReceiver(StatsReceiver streamStatsReceiver) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.streamStatsReceiver = streamStatsReceiver; - return newBuilder; - } - - /** - * Set underlying finagle client builder. - * - * @param builder - * finagle client builder. - * @return client builder. - */ - public DistributedLogClientBuilder clientBuilder(ClientBuilder builder) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientBuilder = builder; - return newBuilder; - } - - /** - * Backoff time when redirecting to an already retried host. - * - * @param ms - * backoff time. - * @return client builder. - */ - public DistributedLogClientBuilder redirectBackoffStartMs(int ms) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setRedirectBackoffStartMs(ms); - return newBuilder; - } - - /** - * Max backoff time when redirecting to an already retried host. - * - * @param ms - * backoff time. - * @return client builder. - */ - public DistributedLogClientBuilder redirectBackoffMaxMs(int ms) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setRedirectBackoffMaxMs(ms); - return newBuilder; - } - - /** - * Max redirects that is allowed per request. - * - * <p>If <i>redirects</i> are exhausted, fail the request immediately. - * - * @param redirects - * max redirects allowed before failing a request. - * @return client builder. - */ - public DistributedLogClientBuilder maxRedirects(int redirects) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setMaxRedirects(redirects); - return newBuilder; - } - - /** - * Timeout per request in millis. - * - * @param timeoutMs - * timeout per request in millis. - * @return client builder. - */ - public DistributedLogClientBuilder requestTimeoutMs(int timeoutMs) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setRequestTimeoutMs(timeoutMs); - return newBuilder; - } - - /** - * Set thriftmux enabled. - * - * @param enabled - * is thriftmux enabled - * @return client builder. - */ - public DistributedLogClientBuilder thriftmux(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setThriftMux(enabled); - return newBuilder; - } - - /** - * Set failfast stream exception handling enabled. - * - * @param enabled - * is failfast exception handling enabled - * @return client builder. - */ - public DistributedLogClientBuilder streamFailfast(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setStreamFailfast(enabled); - return newBuilder; - } - - /** - * Set the regex to match stream names that the client cares about. - * - * @param nameRegex - * stream name regex - * @return client builder - */ - public DistributedLogClientBuilder streamNameRegex(String nameRegex) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setStreamNameRegex(nameRegex); - return newBuilder; - } - - /** - * Whether to use the new handshake endpoint to exchange ownership cache. - * - * <p>Enable this when the servers are updated to support handshaking with client info. - * - * @param enabled - * new handshake endpoint is enabled. - * @return client builder. - */ - public DistributedLogClientBuilder handshakeWithClientInfo(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setHandshakeWithClientInfo(enabled); - return newBuilder; - } - - /** - * Set the periodic handshake interval in milliseconds. - * - * <p>Every <code>intervalMs</code>, the DL client will handshake with existing proxies again. - * If the interval is less than ownership sync interval, the handshake won't sync ownerships. Otherwise, it will. - * - * @see #periodicOwnershipSyncIntervalMs(long) - * @param intervalMs - * handshake interval - * @return client builder. - */ - public DistributedLogClientBuilder periodicHandshakeIntervalMs(long intervalMs) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setPeriodicHandshakeIntervalMs(intervalMs); - return newBuilder; - } - - /** - * Set the periodic ownership sync interval in milliseconds. - * - * <p>If periodic handshake is enabled, the handshake will sync ownership if the elapsed time is larger than - * sync interval. - * - * @see #periodicHandshakeIntervalMs(long) - * @param intervalMs - * interval that handshake should sync ownerships. - * @return client builder - */ - public DistributedLogClientBuilder periodicOwnershipSyncIntervalMs(long intervalMs) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setPeriodicOwnershipSyncIntervalMs(intervalMs); - return newBuilder; - } - - /** - * Enable/Disable periodic dumping ownership cache. - * - * @param enabled - * flag to enable/disable periodic dumping ownership cache - * @return client builder. - */ - public DistributedLogClientBuilder periodicDumpOwnershipCache(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setPeriodicDumpOwnershipCacheEnabled(enabled); - return newBuilder; - } - - /** - * Set periodic dumping ownership cache interval. - * - * @param intervalMs - * interval on dumping ownership cache, in millis. - * @return client builder - */ - public DistributedLogClientBuilder periodicDumpOwnershipCacheIntervalMs(long intervalMs) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setPeriodicDumpOwnershipCacheIntervalMs(intervalMs); - return newBuilder; - } - - /** - * Enable handshake tracing. - * - * @param enabled - * flag to enable/disable handshake tracing - * @return client builder - */ - public DistributedLogClientBuilder handshakeTracing(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setHandshakeTracingEnabled(enabled); - return newBuilder; - } - - /** - * Enable checksum on requests to the proxy. - * - * @param enabled - * flag to enable/disable checksum - * @return client builder - */ - public DistributedLogClientBuilder checksum(boolean enabled) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig.setChecksumEnabled(enabled); - return newBuilder; - } - - /** - * Configure the finagle name string for the server-side routing service. - * - * @param nameStr name string of the server-side routing service - * @return client builder - */ - public DistributedLogClientBuilder serverRoutingServiceFinagleNameStr(String nameStr) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.serverRoutingServiceFinagleName = nameStr; - return newBuilder; - } - - DistributedLogClientBuilder clientConfig(ClientConfig clientConfig) { - DistributedLogClientBuilder newBuilder = newBuilder(this); - newBuilder.clientConfig = ClientConfig.newConfig(clientConfig); - return newBuilder; - } - - /** - * Build distributedlog client. - * - * @return distributedlog client. - */ - public DistributedLogClient build() { - return buildClient(); - } - - /** - * Build monitor service client. - * - * @return monitor service client. - */ - public MonitorServiceClient buildMonitorClient() { - - return buildClient(); - } - - @SuppressWarnings("unchecked") - ClusterClient buildServerRoutingServiceClient(String serverRoutingServiceFinagleName) { - ClientBuilder builder = this.clientBuilder; - if (null == builder) { - builder = ClientBuilder.get() - .tcpConnectTimeout(Duration.fromMilliseconds(200)) - .connectTimeout(Duration.fromMilliseconds(200)) - .requestTimeout(Duration.fromSeconds(1)) - .retries(20); - if (!clientConfig.getThriftMux()) { - builder = builder.hostConnectionLimit(1); - } - } - if (clientConfig.getThriftMux()) { - builder = builder.stack(ThriftMux.client().withClientId(clientId)); - } else { - builder = builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId))); - } - - Name name; - try { - name = Resolver$.MODULE$.eval(serverRoutingServiceFinagleName); - } catch (Exception exc) { - logger.error("Exception in Resolver.eval for name {}", serverRoutingServiceFinagleName, exc); - throw new RuntimeException(exc); - } - - // builder the client - Service<ThriftClientRequest, byte[]> client = - ClientBuilder.safeBuildFactory( - builder.dest(name).reportTo(statsReceiver.scope("routing")) - ).toService(); - DistributedLogService.ServiceIface service = - new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory()); - return new ClusterClient(client, service); - } - - DistributedLogClientImpl buildClient() { - checkNotNull(name, "No name provided."); - checkNotNull(clientId, "No client id provided."); - checkNotNull(routingServiceBuilder, "No routing service builder provided."); - checkNotNull(statsReceiver, "No stats receiver provided."); - if (null == streamStatsReceiver) { - streamStatsReceiver = new NullStatsReceiver(); - } - - Optional<ClusterClient> serverRoutingServiceClient = Optional.absent(); - if (null != serverRoutingServiceFinagleName) { - serverRoutingServiceClient = Optional.of( - buildServerRoutingServiceClient(serverRoutingServiceFinagleName)); - } - - RoutingService routingService = routingServiceBuilder - .statsReceiver(statsReceiver.scope("routing")) - .build(); - DistributedLogClientImpl clientImpl = - new DistributedLogClientImpl( - name, - clientId, - routingService, - clientBuilder, - clientConfig, - serverRoutingServiceClient, - statsReceiver, - streamStatsReceiver, - regionResolver, - enableRegionStats); - routingService.startService(); - clientImpl.handshake(); - return clientImpl; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/service/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/package-info.java deleted file mode 100644 index e6e56c0..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * DistributedLog Service Client. - */ -package com.twitter.distributedlog.service; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java new file mode 100644 index 0000000..57e2b5a --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.concurrent.TimeUnit; + +/** + * Client Config. + */ +public class ClientConfig { + int redirectBackoffStartMs = 25; + int redirectBackoffMaxMs = 100; + int maxRedirects = -1; + int requestTimeoutMs = -1; + boolean thriftmux = false; + boolean streamFailfast = false; + String streamNameRegex = ".*"; + boolean handshakeWithClientInfo = true; + long periodicHandshakeIntervalMs = TimeUnit.MINUTES.toMillis(5); + long periodicOwnershipSyncIntervalMs = TimeUnit.MINUTES.toMillis(5); + boolean periodicDumpOwnershipCacheEnabled = false; + long periodicDumpOwnershipCacheIntervalMs = TimeUnit.MINUTES.toMillis(10); + boolean enableHandshakeTracing = false; + boolean enableChecksum = true; + + public ClientConfig setMaxRedirects(int maxRedirects) { + this.maxRedirects = maxRedirects; + return this; + } + + public int getMaxRedirects() { + return this.maxRedirects; + } + + public ClientConfig setRequestTimeoutMs(int timeoutInMillis) { + this.requestTimeoutMs = timeoutInMillis; + return this; + } + + public int getRequestTimeoutMs() { + return this.requestTimeoutMs; + } + + public ClientConfig setRedirectBackoffStartMs(int ms) { + this.redirectBackoffStartMs = ms; + return this; + } + + public int getRedirectBackoffStartMs() { + return this.redirectBackoffStartMs; + } + + public ClientConfig setRedirectBackoffMaxMs(int ms) { + this.redirectBackoffMaxMs = ms; + return this; + } + + public int getRedirectBackoffMaxMs() { + return this.redirectBackoffMaxMs; + } + + public ClientConfig setThriftMux(boolean enabled) { + this.thriftmux = enabled; + return this; + } + + public boolean getThriftMux() { + return this.thriftmux; + } + + public ClientConfig setStreamFailfast(boolean enabled) { + this.streamFailfast = enabled; + return this; + } + + public boolean getStreamFailfast() { + return this.streamFailfast; + } + + public ClientConfig setStreamNameRegex(String nameRegex) { + checkNotNull(nameRegex); + this.streamNameRegex = nameRegex; + return this; + } + + public String getStreamNameRegex() { + return this.streamNameRegex; + } + + public ClientConfig setHandshakeWithClientInfo(boolean enabled) { + this.handshakeWithClientInfo = enabled; + return this; + } + + public boolean getHandshakeWithClientInfo() { + return this.handshakeWithClientInfo; + } + + public ClientConfig setPeriodicHandshakeIntervalMs(long intervalMs) { + this.periodicHandshakeIntervalMs = intervalMs; + return this; + } + + public long getPeriodicHandshakeIntervalMs() { + return this.periodicHandshakeIntervalMs; + } + + public ClientConfig setPeriodicOwnershipSyncIntervalMs(long intervalMs) { + this.periodicOwnershipSyncIntervalMs = intervalMs; + return this; + } + + public long getPeriodicOwnershipSyncIntervalMs() { + return this.periodicOwnershipSyncIntervalMs; + } + + public ClientConfig setPeriodicDumpOwnershipCacheEnabled(boolean enabled) { + this.periodicDumpOwnershipCacheEnabled = enabled; + return this; + } + + public boolean isPeriodicDumpOwnershipCacheEnabled() { + return this.periodicDumpOwnershipCacheEnabled; + } + + public ClientConfig setPeriodicDumpOwnershipCacheIntervalMs(long intervalMs) { + this.periodicDumpOwnershipCacheIntervalMs = intervalMs; + return this; + } + + public long getPeriodicDumpOwnershipCacheIntervalMs() { + return this.periodicDumpOwnershipCacheIntervalMs; + } + + public ClientConfig setHandshakeTracingEnabled(boolean enabled) { + this.enableHandshakeTracing = enabled; + return this; + } + + public boolean isHandshakeTracingEnabled() { + return this.enableHandshakeTracing; + } + + public ClientConfig setChecksumEnabled(boolean enabled) { + this.enableChecksum = enabled; + return this; + } + + public boolean isChecksumEnabled() { + return this.enableChecksum; + } + + public static ClientConfig newConfig(ClientConfig config) { + ClientConfig newConfig = new ClientConfig(); + newConfig.setMaxRedirects(config.getMaxRedirects()) + .setRequestTimeoutMs(config.getRequestTimeoutMs()) + .setRedirectBackoffStartMs(config.getRedirectBackoffStartMs()) + .setRedirectBackoffMaxMs(config.getRedirectBackoffMaxMs()) + .setThriftMux(config.getThriftMux()) + .setStreamFailfast(config.getStreamFailfast()) + .setStreamNameRegex(config.getStreamNameRegex()) + .setHandshakeWithClientInfo(config.getHandshakeWithClientInfo()) + .setPeriodicHandshakeIntervalMs(config.getPeriodicHandshakeIntervalMs()) + .setPeriodicDumpOwnershipCacheEnabled(config.isPeriodicDumpOwnershipCacheEnabled()) + .setPeriodicDumpOwnershipCacheIntervalMs(config.getPeriodicDumpOwnershipCacheIntervalMs()) + .setHandshakeTracingEnabled(config.isHandshakeTracingEnabled()) + .setChecksumEnabled(config.isChecksumEnabled()); + return newConfig; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java new file mode 100644 index 0000000..1300187 --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java @@ -0,0 +1,1200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecordSetBuffer; +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.distributedlog.client.ownership.OwnershipCache; +import org.apache.distributedlog.client.proxy.ClusterClient; +import org.apache.distributedlog.client.proxy.HostProvider; +import org.apache.distributedlog.client.proxy.ProxyClient; +import org.apache.distributedlog.client.proxy.ProxyClientManager; +import org.apache.distributedlog.client.proxy.ProxyListener; +import org.apache.distributedlog.client.resolver.RegionResolver; +import org.apache.distributedlog.client.routing.RoutingService; +import org.apache.distributedlog.client.routing.RoutingService.RoutingContext; +import org.apache.distributedlog.client.stats.ClientStats; +import org.apache.distributedlog.client.stats.OpStats; +import org.apache.distributedlog.exceptions.DLClientClosedException; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.ServiceUnavailableException; +import org.apache.distributedlog.exceptions.StreamUnavailableException; +import org.apache.distributedlog.service.DLSocketAddress; +import org.apache.distributedlog.service.DistributedLogClient; +import org.apache.distributedlog.thrift.service.BulkWriteResponse; +import org.apache.distributedlog.thrift.service.HeartbeatOptions; +import org.apache.distributedlog.thrift.service.ResponseHeader; +import org.apache.distributedlog.thrift.service.ServerInfo; +import org.apache.distributedlog.thrift.service.ServerStatus; +import org.apache.distributedlog.thrift.service.StatusCode; +import org.apache.distributedlog.thrift.service.WriteContext; +import org.apache.distributedlog.thrift.service.WriteResponse; +import org.apache.distributedlog.util.ProtocolUtils; +import com.twitter.finagle.CancelledRequestException; +import com.twitter.finagle.ConnectionFailedException; +import com.twitter.finagle.Failure; +import com.twitter.finagle.NoBrokersAvailableException; +import com.twitter.finagle.RequestTimeoutException; +import com.twitter.finagle.ServiceException; +import com.twitter.finagle.ServiceTimeoutException; +import com.twitter.finagle.WriteException; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.stats.StatsReceiver; +import com.twitter.finagle.thrift.ClientId; +import com.twitter.util.Duration; +import com.twitter.util.Function; +import com.twitter.util.Function0; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import com.twitter.util.Return; +import com.twitter.util.Throw; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.thrift.TApplicationException; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Seq; +import scala.runtime.AbstractFunction1; + + +/** + * Implementation of distributedlog client. + */ +public class DistributedLogClientImpl implements DistributedLogClient, MonitorServiceClient, + RoutingService.RoutingListener, ProxyListener, HostProvider { + + private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class); + + private final String clientName; + private final ClientId clientId; + private final ClientConfig clientConfig; + private final RoutingService routingService; + private final ProxyClient.Builder clientBuilder; + private final boolean streamFailfast; + private final Pattern streamNameRegexPattern; + + // Timer + private final HashedWheelTimer dlTimer; + + // region resolver + private final RegionResolver regionResolver; + + // Ownership maintenance + private final OwnershipCache ownershipCache; + // Channel/Client management + private final ProxyClientManager clientManager; + // Cluster Client (for routing service) + private final Optional<ClusterClient> clusterClient; + + // Close Status + private boolean closed = false; + private final ReentrantReadWriteLock closeLock = + new ReentrantReadWriteLock(); + + abstract class StreamOp implements TimerTask { + final String stream; + + final AtomicInteger tries = new AtomicInteger(0); + final RoutingContext routingContext = RoutingContext.of(regionResolver); + final WriteContext ctx = new WriteContext(); + final Stopwatch stopwatch; + final OpStats opStats; + SocketAddress nextAddressToSend; + + StreamOp(final String stream, final OpStats opStats) { + this.stream = stream; + this.stopwatch = Stopwatch.createStarted(); + this.opStats = opStats; + } + + boolean shouldTimeout() { + long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); + return shouldTimeout(elapsedMs); + } + + boolean shouldTimeout(long elapsedMs) { + return clientConfig.getRequestTimeoutMs() > 0 + && elapsedMs >= clientConfig.getRequestTimeoutMs(); + } + + void send(SocketAddress address) { + long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); + if (clientConfig.getMaxRedirects() > 0 + && tries.get() >= clientConfig.getMaxRedirects()) { + fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs), + "Exhausted max redirects in " + elapsedMs + " ms")); + return; + } else if (shouldTimeout(elapsedMs)) { + fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs), + "Exhausted max request timeout " + clientConfig.getRequestTimeoutMs() + + " in " + elapsedMs + " ms")); + return; + } + synchronized (this) { + String addrStr = address.toString(); + if (ctx.isSetTriedHosts() && ctx.getTriedHosts().contains(addrStr)) { + nextAddressToSend = address; + dlTimer.newTimeout(this, + Math.min(clientConfig.getRedirectBackoffMaxMs(), + tries.get() * clientConfig.getRedirectBackoffStartMs()), + TimeUnit.MILLISECONDS); + } else { + doSend(address); + } + } + } + + abstract Future<ResponseHeader> sendRequest(ProxyClient sc); + + void doSend(SocketAddress address) { + ctx.addToTriedHosts(address.toString()); + if (clientConfig.isChecksumEnabled()) { + Long crc32 = computeChecksum(); + if (null != crc32) { + ctx.setCrc32(crc32); + } + } + tries.incrementAndGet(); + sendWriteRequest(address, this); + } + + void beforeComplete(ProxyClient sc, ResponseHeader responseHeader) { + ownershipCache.updateOwner(stream, sc.getAddress()); + } + + void complete(SocketAddress address) { + stopwatch.stop(); + opStats.completeRequest(address, + stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get()); + } + + void fail(SocketAddress address, Throwable t) { + stopwatch.stop(); + opStats.failRequest(address, + stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get()); + } + + Long computeChecksum() { + return null; + } + + @Override + public synchronized void run(Timeout timeout) throws Exception { + if (!timeout.isCancelled() && null != nextAddressToSend) { + doSend(nextAddressToSend); + } else { + fail(null, new CancelledRequestException()); + } + } + } + + class BulkWriteOp extends StreamOp { + + final List<ByteBuffer> data; + final ArrayList<Promise<DLSN>> results; + + BulkWriteOp(final String name, final List<ByteBuffer> data) { + super(name, clientStats.getOpStats("bulk_write")); + this.data = data; + + // This could take a while (relatively speaking) for very large inputs. We probably don't want + // to go so large for other reasons though. + this.results = new ArrayList<Promise<DLSN>>(data.size()); + for (int i = 0; i < data.size(); i++) { + checkNotNull(data.get(i)); + this.results.add(new Promise<DLSN>()); + } + } + + @Override + Future<ResponseHeader> sendRequest(final ProxyClient sc) { + return sc.getService().writeBulkWithContext(stream, data, ctx) + .addEventListener(new FutureEventListener<BulkWriteResponse>() { + @Override + public void onSuccess(BulkWriteResponse response) { + // For non-success case, the ResponseHeader handler (the caller) will handle it. + // Note success in this case means no finagle errors have occurred + // (such as finagle connection issues). In general code != SUCCESS means there's some error + // reported by dlog service. The caller will handle such errors. + if (response.getHeader().getCode() == StatusCode.SUCCESS) { + beforeComplete(sc, response.getHeader()); + BulkWriteOp.this.complete(sc.getAddress(), response); + if (response.getWriteResponses().size() == 0 && data.size() > 0) { + logger.error("non-empty bulk write got back empty response without failure for stream {}", + stream); + } + } + } + @Override + public void onFailure(Throwable cause) { + // Handled by the ResponseHeader listener (attached by the caller). + } + }).map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() { + @Override + public ResponseHeader apply(BulkWriteResponse response) { + // We need to return the ResponseHeader to the caller's listener to process DLOG errors. + return response.getHeader(); + } + }); + } + + void complete(SocketAddress address, BulkWriteResponse bulkWriteResponse) { + super.complete(address); + Iterator<WriteResponse> writeResponseIterator = bulkWriteResponse.getWriteResponses().iterator(); + Iterator<Promise<DLSN>> resultIterator = results.iterator(); + + // Fill in errors from thrift responses. + while (resultIterator.hasNext() && writeResponseIterator.hasNext()) { + Promise<DLSN> result = resultIterator.next(); + WriteResponse writeResponse = writeResponseIterator.next(); + if (StatusCode.SUCCESS == writeResponse.getHeader().getCode()) { + result.setValue(DLSN.deserialize(writeResponse.getDlsn())); + } else { + result.setException(DLException.of(writeResponse.getHeader())); + } + } + + // Should never happen, but just in case so there's some record. + if (bulkWriteResponse.getWriteResponses().size() != data.size()) { + logger.error("wrong number of results, response = {} records = {}", + bulkWriteResponse.getWriteResponses().size(), data.size()); + } + } + + @Override + void fail(SocketAddress address, Throwable t) { + + // StreamOp.fail is called to fail the overall request. In case of BulkWriteOp we take the request level + // exception to apply to the first write. In fact for request level exceptions no request has ever been + // attempted, but logically we associate the error with the first write. + super.fail(address, t); + Iterator<Promise<DLSN>> resultIterator = results.iterator(); + + // Fail the first write with the batch level failure. + if (resultIterator.hasNext()) { + Promise<DLSN> result = resultIterator.next(); + result.setException(t); + } + + // Fail the remaining writes as cancelled requests. + while (resultIterator.hasNext()) { + Promise<DLSN> result = resultIterator.next(); + result.setException(new CancelledRequestException()); + } + } + + @SuppressWarnings("unchecked") + List<Future<DLSN>> result() { + return (List) results; + } + } + + abstract class AbstractWriteOp extends StreamOp { + + final Promise<WriteResponse> result = new Promise<WriteResponse>(); + Long crc32 = null; + + AbstractWriteOp(final String name, final OpStats opStats) { + super(name, opStats); + } + + void complete(SocketAddress address, WriteResponse response) { + super.complete(address); + result.setValue(response); + } + + @Override + void fail(SocketAddress address, Throwable t) { + super.fail(address, t); + result.setException(t); + } + + @Override + Long computeChecksum() { + if (null == crc32) { + crc32 = ProtocolUtils.streamOpCRC32(stream); + } + return crc32; + } + + @Override + Future<ResponseHeader> sendRequest(final ProxyClient sc) { + return this.sendWriteRequest(sc).addEventListener(new FutureEventListener<WriteResponse>() { + @Override + public void onSuccess(WriteResponse response) { + if (response.getHeader().getCode() == StatusCode.SUCCESS) { + beforeComplete(sc, response.getHeader()); + AbstractWriteOp.this.complete(sc.getAddress(), response); + } + } + @Override + public void onFailure(Throwable cause) { + // handled by the ResponseHeader listener + } + }).map(new AbstractFunction1<WriteResponse, ResponseHeader>() { + @Override + public ResponseHeader apply(WriteResponse response) { + return response.getHeader(); + } + }); + } + + abstract Future<WriteResponse> sendWriteRequest(ProxyClient sc); + } + + class WriteOp extends AbstractWriteOp { + final ByteBuffer data; + + WriteOp(final String name, final ByteBuffer data) { + super(name, clientStats.getOpStats("write")); + this.data = data; + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().writeWithContext(stream, data, ctx); + } + + @Override + Long computeChecksum() { + if (null == crc32) { + byte[] dataBytes = new byte[data.remaining()]; + data.duplicate().get(dataBytes); + crc32 = ProtocolUtils.writeOpCRC32(stream, dataBytes); + } + return crc32; + } + + Future<DLSN> result() { + return result.map(new AbstractFunction1<WriteResponse, DLSN>() { + @Override + public DLSN apply(WriteResponse response) { + return DLSN.deserialize(response.getDlsn()); + } + }); + } + } + + class TruncateOp extends AbstractWriteOp { + final DLSN dlsn; + + TruncateOp(String name, DLSN dlsn) { + super(name, clientStats.getOpStats("truncate")); + this.dlsn = dlsn; + } + + @Override + Long computeChecksum() { + if (null == crc32) { + crc32 = ProtocolUtils.truncateOpCRC32(stream, dlsn); + } + return crc32; + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().truncate(stream, dlsn.serialize(), ctx); + } + + Future<Boolean> result() { + return result.map(new AbstractFunction1<WriteResponse, Boolean>() { + @Override + public Boolean apply(WriteResponse response) { + return true; + } + }); + } + } + + class WriteRecordSetOp extends WriteOp { + + WriteRecordSetOp(String name, LogRecordSetBuffer recordSet) { + super(name, recordSet.getBuffer()); + ctx.setIsRecordSet(true); + } + + } + + + class ReleaseOp extends AbstractWriteOp { + + ReleaseOp(String name) { + super(name, clientStats.getOpStats("release")); + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().release(stream, ctx); + } + + @Override + void beforeComplete(ProxyClient sc, ResponseHeader header) { + ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted"); + } + + Future<Void> result() { + return result.map(new AbstractFunction1<WriteResponse, Void>() { + @Override + public Void apply(WriteResponse response) { + return null; + } + }); + } + } + + class DeleteOp extends AbstractWriteOp { + + DeleteOp(String name) { + super(name, clientStats.getOpStats("delete")); + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().delete(stream, ctx); + } + + @Override + void beforeComplete(ProxyClient sc, ResponseHeader header) { + ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted"); + } + + Future<Void> result() { + return result.map(new AbstractFunction1<WriteResponse, Void>() { + @Override + public Void apply(WriteResponse v1) { + return null; + } + }); + } + } + + class CreateOp extends AbstractWriteOp { + + CreateOp(String name) { + super(name, clientStats.getOpStats("create")); + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().create(stream, ctx); + } + + @Override + void beforeComplete(ProxyClient sc, ResponseHeader header) { + ownershipCache.updateOwner(stream, sc.getAddress()); + } + + Future<Void> result() { + return result.map(new AbstractFunction1<WriteResponse, Void>() { + @Override + public Void apply(WriteResponse v1) { + return null; + } + }).voided(); + } + } + + class HeartbeatOp extends AbstractWriteOp { + HeartbeatOptions options; + + HeartbeatOp(String name, boolean sendReaderHeartBeat) { + super(name, clientStats.getOpStats("heartbeat")); + options = new HeartbeatOptions(); + options.setSendHeartBeatToReader(sendReaderHeartBeat); + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().heartbeatWithOptions(stream, ctx, options); + } + + Future<Void> result() { + return result.map(new AbstractFunction1<WriteResponse, Void>() { + @Override + public Void apply(WriteResponse response) { + return null; + } + }); + } + } + + // Stats + private final ClientStats clientStats; + + public DistributedLogClientImpl(String name, + ClientId clientId, + RoutingService routingService, + ClientBuilder clientBuilder, + ClientConfig clientConfig, + Optional<ClusterClient> clusterClient, + StatsReceiver statsReceiver, + StatsReceiver streamStatsReceiver, + RegionResolver regionResolver, + boolean enableRegionStats) { + this.clientName = name; + this.clientId = clientId; + this.routingService = routingService; + this.clientConfig = clientConfig; + this.streamFailfast = clientConfig.getStreamFailfast(); + this.streamNameRegexPattern = Pattern.compile(clientConfig.getStreamNameRegex()); + this.regionResolver = regionResolver; + // Build the timer + this.dlTimer = new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("DLClient-" + name + "-timer-%d").build(), + this.clientConfig.getRedirectBackoffStartMs(), + TimeUnit.MILLISECONDS); + // register routing listener + this.routingService.registerListener(this); + // build the ownership cache + this.ownershipCache = new OwnershipCache(this.clientConfig, this.dlTimer, statsReceiver, streamStatsReceiver); + // Client Stats + this.clientStats = new ClientStats(statsReceiver, enableRegionStats, regionResolver); + // Client Manager + this.clientBuilder = ProxyClient.newBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats); + this.clientManager = new ProxyClientManager( + this.clientConfig, // client config + this.clientBuilder, // client builder + this.dlTimer, // timer + this, // host provider + clientStats); // client stats + this.clusterClient = clusterClient; + this.clientManager.registerProxyListener(this); + + // Cache Stats + StatsReceiver cacheStatReceiver = statsReceiver.scope("cache"); + Seq<String> numCachedStreamsGaugeName = + scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_streams")).toList(); + cacheStatReceiver.provideGauge(numCachedStreamsGaugeName, new Function0<Object>() { + @Override + public Object apply() { + return (float) ownershipCache.getNumCachedStreams(); + } + }); + Seq<String> numCachedHostsGaugeName = + scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_hosts")).toList(); + cacheStatReceiver.provideGauge(numCachedHostsGaugeName, new Function0<Object>() { + @Override + public Object apply() { + return (float) clientManager.getNumProxies(); + } + }); + + logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {}," + + " stats_receiver = {}, thriftmux = {}", + new Object[] { + name, + clientId, + routingService.getClass(), + statsReceiver.getClass(), + clientConfig.getThriftMux() + }); + } + + @Override + public Set<SocketAddress> getHosts() { + Set<SocketAddress> hosts = Sets.newHashSet(); + // if using server side routing, we only handshake with the hosts in ownership cache. + if (!clusterClient.isPresent()) { + hosts.addAll(this.routingService.getHosts()); + } + hosts.addAll(this.ownershipCache.getStreamOwnershipDistribution().keySet()); + return hosts; + } + + @Override + public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { + if (null != serverInfo + && serverInfo.isSetServerStatus() + && ServerStatus.DOWN == serverInfo.getServerStatus()) { + logger.info("{} is detected as DOWN during handshaking", address); + // server is shutting down + handleServiceUnavailable(address, client, Optional.<StreamOp>absent()); + return; + } + + if (null != serverInfo && serverInfo.isSetOwnerships()) { + Map<String, String> ownerships = serverInfo.getOwnerships(); + logger.debug("Handshaked with {} : {} ownerships returned.", address, ownerships.size()); + for (Map.Entry<String, String> entry : ownerships.entrySet()) { + Matcher matcher = streamNameRegexPattern.matcher(entry.getKey()); + if (!matcher.matches()) { + continue; + } + updateOwnership(entry.getKey(), entry.getValue()); + } + } else { + logger.debug("Handshaked with {} : no ownerships returned", address); + } + } + + @Override + public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { + cause = showRootCause(Optional.<StreamOp>absent(), cause); + handleRequestException(address, client, Optional.<StreamOp>absent(), cause); + } + + @VisibleForTesting + public void handshake() { + clientManager.handshake(); + logger.info("Handshaked with {} hosts, cached {} streams", + clientManager.getNumProxies(), ownershipCache.getNumCachedStreams()); + } + + @Override + public void onServerLeft(SocketAddress address) { + onServerLeft(address, null); + } + + private void onServerLeft(SocketAddress address, ProxyClient sc) { + ownershipCache.removeAllStreamsFromOwner(address); + if (null == sc) { + clientManager.removeClient(address); + } else { + clientManager.removeClient(address, sc); + } + } + + @Override + public void onServerJoin(SocketAddress address) { + // we only pre-create connection for client-side routing + // if it is server side routing, we only know the exact proxy address + // when #getOwner. + if (!clusterClient.isPresent()) { + clientManager.createClient(address); + } + } + + public void close() { + closeLock.writeLock().lock(); + try { + if (closed) { + return; + } + closed = true; + } finally { + closeLock.writeLock().unlock(); + } + clientManager.close(); + routingService.unregisterListener(this); + routingService.stopService(); + dlTimer.stop(); + } + + @Override + public Future<Void> check(String stream) { + final HeartbeatOp op = new HeartbeatOp(stream, false); + sendRequest(op); + return op.result(); + } + + @Override + public Future<Void> heartbeat(String stream) { + final HeartbeatOp op = new HeartbeatOp(stream, true); + sendRequest(op); + return op.result(); + } + + @Override + public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() { + return ownershipCache.getStreamOwnershipDistribution(); + } + + @Override + public Future<Void> setAcceptNewStream(boolean enabled) { + Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients(); + List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size()); + for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) { + futures.add(entry.getValue().getService().setAcceptNewStream(enabled)); + } + return Future.collect(futures).map(new Function<List<Void>, Void>() { + @Override + public Void apply(List<Void> list) { + return null; + } + }); + } + + @Override + public Future<DLSN> write(String stream, ByteBuffer data) { + final WriteOp op = new WriteOp(stream, data); + sendRequest(op); + return op.result(); + } + + @Override + public Future<DLSN> writeRecordSet(String stream, final LogRecordSetBuffer recordSet) { + final WriteRecordSetOp op = new WriteRecordSetOp(stream, recordSet); + sendRequest(op); + return op.result(); + } + + @Override + public List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data) { + if (data.size() > 0) { + final BulkWriteOp op = new BulkWriteOp(stream, data); + sendRequest(op); + return op.result(); + } else { + return Collections.emptyList(); + } + } + + @Override + public Future<Boolean> truncate(String stream, DLSN dlsn) { + final TruncateOp op = new TruncateOp(stream, dlsn); + sendRequest(op); + return op.result(); + } + + @Override + public Future<Void> delete(String stream) { + final DeleteOp op = new DeleteOp(stream); + sendRequest(op); + return op.result(); + } + + @Override + public Future<Void> release(String stream) { + final ReleaseOp op = new ReleaseOp(stream); + sendRequest(op); + return op.result(); + } + + @Override + public Future<Void> create(String stream) { + final CreateOp op = new CreateOp(stream); + sendRequest(op); + return op.result(); + } + + private void sendRequest(final StreamOp op) { + closeLock.readLock().lock(); + try { + if (closed) { + op.fail(null, new DLClientClosedException("Client " + clientName + " is closed.")); + } else { + doSend(op, null); + } + } finally { + closeLock.readLock().unlock(); + } + } + + /** + * Send the stream operation by routing service, excluding previous address if it is not null. + * + * @param op + * stream operation. + * @param previousAddr + * previous tried address. + */ + private void doSend(final StreamOp op, final SocketAddress previousAddr) { + if (null != previousAddr) { + op.routingContext.addTriedHost(previousAddr, StatusCode.WRITE_EXCEPTION); + } + // Get host first + final SocketAddress address = ownershipCache.getOwner(op.stream); + if (null == address || op.routingContext.isTriedHost(address)) { + getOwner(op).addEventListener(new FutureEventListener<SocketAddress>() { + @Override + public void onFailure(Throwable cause) { + op.fail(null, cause); + } + + @Override + public void onSuccess(SocketAddress ownerAddr) { + op.send(ownerAddr); + } + }); + } else { + op.send(address); + } + } + + private void retryGetOwnerFromResourcePlacementServer(final StreamOp op, + final Promise<SocketAddress> getOwnerPromise, + final Throwable cause) { + if (op.shouldTimeout()) { + op.fail(null, cause); + return; + } + getOwnerFromResourcePlacementServer(op, getOwnerPromise); + } + + private void getOwnerFromResourcePlacementServer(final StreamOp op, + final Promise<SocketAddress> getOwnerPromise) { + clusterClient.get().getService().getOwner(op.stream, op.ctx) + .addEventListener(new FutureEventListener<WriteResponse>() { + @Override + public void onFailure(Throwable cause) { + getOwnerPromise.updateIfEmpty(new Throw<SocketAddress>(cause)); + } + + @Override + public void onSuccess(WriteResponse value) { + if (StatusCode.FOUND == value.getHeader().getCode() + && null != value.getHeader().getLocation()) { + try { + InetSocketAddress addr = DLSocketAddress.deserialize( + value.getHeader().getLocation() + ).getSocketAddress(); + getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr)); + } catch (IOException e) { + // retry from the routing server again + logger.error("ERROR in getOwner", e); + retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e); + return; + } + } else { + // retry from the routing server again + retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, + new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown")); + } + } + }); + } + + private Future<SocketAddress> getOwner(final StreamOp op) { + if (clusterClient.isPresent()) { + final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>(); + getOwnerFromResourcePlacementServer(op, getOwnerPromise); + return getOwnerPromise; + } + // pickup host by hashing + try { + return Future.value(routingService.getHost(op.stream, op.routingContext)); + } catch (NoBrokersAvailableException nbae) { + return Future.exception(nbae); + } + } + + private void sendWriteRequest(final SocketAddress addr, final StreamOp op) { + // Get corresponding finagle client + final ProxyClient sc = clientManager.getClient(addr); + final long startTimeNanos = System.nanoTime(); + // write the request to that host. + op.sendRequest(sc).addEventListener(new FutureEventListener<ResponseHeader>() { + @Override + public void onSuccess(ResponseHeader header) { + if (logger.isDebugEnabled()) { + logger.debug("Received response; header: {}", header); + } + clientStats.completeProxyRequest(addr, header.getCode(), startTimeNanos); + // update routing context + op.routingContext.addTriedHost(addr, header.getCode()); + switch (header.getCode()) { + case SUCCESS: + // success handling is done per stream op + break; + case FOUND: + handleRedirectResponse(header, op, addr); + break; + // for overcapacity, dont report failure since this normally happens quite a bit + case OVER_CAPACITY: + logger.debug("Failed to write request to {} : {}", op.stream, header); + op.fail(addr, DLException.of(header)); + break; + // for responses that indicate the requests definitely failed, + // we should fail them immediately (e.g. TOO_LARGE_RECORD, METADATA_EXCEPTION) + case NOT_IMPLEMENTED: + case METADATA_EXCEPTION: + case LOG_EMPTY: + case LOG_NOT_FOUND: + case TRUNCATED_TRANSACTION: + case END_OF_STREAM: + case TRANSACTION_OUT_OF_ORDER: + case INVALID_STREAM_NAME: + case REQUEST_DENIED: + case TOO_LARGE_RECORD: + case CHECKSUM_FAILED: + // status code NOT_READY is returned if failfast is enabled in the server. don't redirect + // since the proxy may still own the stream. + case STREAM_NOT_READY: + op.fail(addr, DLException.of(header)); + break; + case SERVICE_UNAVAILABLE: + handleServiceUnavailable(addr, sc, Optional.of(op)); + break; + case REGION_UNAVAILABLE: + // region is unavailable, redirect the request to hosts in other region + redirect(op, null); + break; + // Proxy was overloaded and refused to try to acquire the stream. Don't remove ownership, since + // we didn't have it in the first place. + case TOO_MANY_STREAMS: + handleRedirectableError(addr, op, header); + break; + case STREAM_UNAVAILABLE: + case ZOOKEEPER_ERROR: + case LOCKING_EXCEPTION: + case UNEXPECTED: + case INTERRUPTED: + case BK_TRANSMIT_ERROR: + case FLUSH_TIMEOUT: + default: + // when we are receiving these exceptions from proxy, it means proxy or the stream is closed + // redirect the request. + ownershipCache.removeOwnerFromStream(op.stream, addr, header.getCode().name()); + handleRedirectableError(addr, op, header); + break; + } + } + + @Override + public void onFailure(Throwable cause) { + Optional<StreamOp> opOptional = Optional.of(op); + cause = showRootCause(opOptional, cause); + clientStats.failProxyRequest(addr, cause, startTimeNanos); + handleRequestException(addr, sc, opOptional, cause); + } + }); + } + + // Response Handlers + + Throwable showRootCause(Optional<StreamOp> op, Throwable cause) { + if (cause instanceof Failure) { + Failure failure = (Failure) cause; + if (failure.isFlagged(Failure.Wrapped())) { + try { + // if it is a wrapped failure, unwrap it first + cause = failure.show(); + } catch (IllegalArgumentException iae) { + if (op.isPresent()) { + logger.warn("Failed to unwrap finagle failure of stream {} : ", op.get().stream, iae); + } else { + logger.warn("Failed to unwrap finagle failure : ", iae); + } + } + } + } + return cause; + } + + private void handleRedirectableError(SocketAddress addr, + StreamOp op, + ResponseHeader header) { + if (streamFailfast) { + op.fail(addr, DLException.of(header)); + } else { + redirect(op, null); + } + } + + void handleServiceUnavailable(SocketAddress addr, + ProxyClient sc, + Optional<StreamOp> op) { + // service is unavailable, remove it out of routing service + routingService.removeHost(addr, new ServiceUnavailableException(addr + " is unavailable now.")); + onServerLeft(addr); + if (op.isPresent()) { + ownershipCache.removeOwnerFromStream(op.get().stream, addr, addr + " is unavailable now."); + // redirect the request to other host. + redirect(op.get(), null); + } + } + + void handleRequestException(SocketAddress addr, + ProxyClient sc, + Optional<StreamOp> op, + Throwable cause) { + boolean resendOp = false; + boolean removeOwnerFromStream = false; + SocketAddress previousAddr = addr; + String reason = cause.getMessage(); + if (cause instanceof ConnectionFailedException || cause instanceof java.net.ConnectException) { + routingService.removeHost(addr, cause); + onServerLeft(addr, sc); + removeOwnerFromStream = true; + // redirect the request to other host. + resendOp = true; + } else if (cause instanceof ChannelException) { + // java.net.ConnectException typically means connection is refused remotely + // no process listening on remote address/port. + if (cause.getCause() instanceof java.net.ConnectException) { + routingService.removeHost(addr, cause.getCause()); + onServerLeft(addr); + reason = cause.getCause().getMessage(); + } else { + routingService.removeHost(addr, cause); + reason = cause.getMessage(); + } + removeOwnerFromStream = true; + // redirect the request to other host. + resendOp = true; + } else if (cause instanceof ServiceTimeoutException) { + // redirect the request to itself again, which will backoff for a while + resendOp = true; + previousAddr = null; + } else if (cause instanceof WriteException) { + // redirect the request to other host. + resendOp = true; + } else if (cause instanceof ServiceException) { + // redirect the request to other host. + clientManager.removeClient(addr, sc); + resendOp = true; + } else if (cause instanceof TApplicationException) { + handleTApplicationException(cause, op, addr, sc); + } else if (cause instanceof Failure) { + handleFinagleFailure((Failure) cause, op, addr); + } else { + // Default handler + handleException(cause, op, addr); + } + + if (op.isPresent()) { + if (removeOwnerFromStream) { + ownershipCache.removeOwnerFromStream(op.get().stream, addr, reason); + } + if (resendOp) { + doSend(op.get(), previousAddr); + } + } + } + + /** + * Redirect the request to new proxy <i>newAddr</i>. If <i>newAddr</i> is null, + * it would pick up a host from routing service. + * + * @param op + * stream operation + * @param newAddr + * new proxy address + */ + void redirect(StreamOp op, SocketAddress newAddr) { + ownershipCache.getOwnershipStatsLogger().onRedirect(op.stream); + if (null != newAddr) { + logger.debug("Redirect request {} to new owner {}.", op, newAddr); + op.send(newAddr); + } else { + doSend(op, null); + } + } + + void handleFinagleFailure(Failure failure, + Optional<StreamOp> op, + SocketAddress addr) { + if (failure.isFlagged(Failure.Restartable())) { + if (op.isPresent()) { + // redirect the request to other host + doSend(op.get(), addr); + } + } else { + // fail the request if it is other types of failures + handleException(failure, op, addr); + } + } + + void handleException(Throwable cause, + Optional<StreamOp> op, + SocketAddress addr) { + // RequestTimeoutException: fail it and let client decide whether to retry or not. + + // FailedFastException: + // We don't actually know when FailedFastException will be thrown + // so properly we just throw it back to application to let application + // handle it. + + // Other Exceptions: as we don't know how to handle them properly so throw them to client + if (op.isPresent()) { + logger.error("Failed to write request to {} @ {} : {}", + new Object[]{op.get().stream, addr, cause.toString()}); + op.get().fail(addr, cause); + } + } + + void handleTApplicationException(Throwable cause, + Optional<StreamOp> op, + SocketAddress addr, + ProxyClient sc) { + TApplicationException ex = (TApplicationException) cause; + if (ex.getType() == TApplicationException.UNKNOWN_METHOD) { + // if we encountered unknown method exception on thrift server, it means this proxy + // has problem. we should remove it from routing service, clean up ownerships + routingService.removeHost(addr, cause); + onServerLeft(addr, sc); + if (op.isPresent()) { + ownershipCache.removeOwnerFromStream(op.get().stream, addr, cause.getMessage()); + doSend(op.get(), addr); + } + } else { + handleException(cause, op, addr); + } + } + + void handleRedirectResponse(ResponseHeader header, StreamOp op, SocketAddress curAddr) { + SocketAddress ownerAddr = null; + if (header.isSetLocation()) { + String owner = header.getLocation(); + try { + ownerAddr = DLSocketAddress.deserialize(owner).getSocketAddress(); + // if we are receiving a direct request to same host, we won't try the same host. + // as the proxy will shut itself down if it redirects client to itself. + if (curAddr.equals(ownerAddr)) { + logger.warn("Request to stream {} is redirected to same server {}!", op.stream, curAddr); + ownerAddr = null; + } else { + // update ownership when redirects. + ownershipCache.updateOwner(op.stream, ownerAddr); + } + } catch (IOException e) { + ownerAddr = null; + } + } + redirect(op, ownerAddr); + } + + void updateOwnership(String stream, String location) { + try { + SocketAddress ownerAddr = DLSocketAddress.deserialize(location).getSocketAddress(); + // update ownership + ownershipCache.updateOwner(stream, ownerAddr); + } catch (IOException e) { + logger.warn("Invalid ownership {} found for stream {} : ", + new Object[] { location, stream, e }); + } + } + +}