http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java deleted file mode 100644 index 6ef1d8e..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.proxy; - -import org.apache.distributedlog.client.ClientConfig; -import org.apache.distributedlog.client.stats.ClientStats; -import org.apache.distributedlog.thrift.service.DistributedLogService; -import com.twitter.finagle.Service; -import com.twitter.finagle.ThriftMux; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.thrift.ClientId; -import com.twitter.finagle.thrift.ThriftClientFramedCodec; -import com.twitter.finagle.thrift.ThriftClientRequest; -import com.twitter.util.Duration; -import com.twitter.util.Future; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import org.apache.thrift.protocol.TBinaryProtocol; -import scala.Option; -import scala.runtime.BoxedUnit; - -/** - * Client talks to a single proxy. - */ -public class ProxyClient { - - /** - * Builder to build a proxy client talking to given host <code>address</code>. - */ - public interface Builder { - /** - * Build a proxy client to <code>address</code>. - * - * @param address - * proxy address - * @return proxy client - */ - ProxyClient build(SocketAddress address); - } - - public static Builder newBuilder(String clientName, - ClientId clientId, - ClientBuilder clientBuilder, - ClientConfig clientConfig, - ClientStats clientStats) { - return new DefaultBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats); - } - - /** - * Default Builder for {@link ProxyClient}. - */ - public static class DefaultBuilder implements Builder { - - private final String clientName; - private final ClientId clientId; - private final ClientBuilder clientBuilder; - private final ClientStats clientStats; - - private DefaultBuilder(String clientName, - ClientId clientId, - ClientBuilder clientBuilder, - ClientConfig clientConfig, - ClientStats clientStats) { - this.clientName = clientName; - this.clientId = clientId; - this.clientStats = clientStats; - // client builder - ClientBuilder builder = setDefaultSettings( - null == clientBuilder ? getDefaultClientBuilder(clientConfig) : clientBuilder); - this.clientBuilder = configureThriftMux(builder, clientId, clientConfig); - } - - @SuppressWarnings("unchecked") - private ClientBuilder configureThriftMux(ClientBuilder builder, - ClientId clientId, - ClientConfig clientConfig) { - if (clientConfig.getThriftMux()) { - return builder.stack(ThriftMux.client().withClientId(clientId)); - } else { - return builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId))); - } - } - - private ClientBuilder getDefaultClientBuilder(ClientConfig clientConfig) { - ClientBuilder builder = ClientBuilder.get() - .tcpConnectTimeout(Duration.fromMilliseconds(200)) - .connectTimeout(Duration.fromMilliseconds(200)) - .requestTimeout(Duration.fromSeconds(1)); - if (!clientConfig.getThriftMux()) { - builder = builder.hostConnectionLimit(1); - } - return builder; - } - - @SuppressWarnings("unchecked") - private ClientBuilder setDefaultSettings(ClientBuilder builder) { - return builder.name(clientName) - .failFast(false) - .noFailureAccrual() - // disable retries on finagle client builder, as there is only one host per finagle client - // we should throw exception immediately on first failure, so DL client could quickly detect - // failures and retry other proxies. - .retries(1) - .keepAlive(true); - } - - @Override - @SuppressWarnings("unchecked") - public ProxyClient build(SocketAddress address) { - Service<ThriftClientRequest, byte[]> client = - ClientBuilder.safeBuildFactory( - clientBuilder - .hosts((InetSocketAddress) address) - .reportTo(clientStats.getFinagleStatsReceiver(address)) - ).toService(); - DistributedLogService.ServiceIface service = - new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory()); - return new ProxyClient(address, client, service); - } - - } - - private final SocketAddress address; - private final Service<ThriftClientRequest, byte[]> client; - private final DistributedLogService.ServiceIface service; - - protected ProxyClient(SocketAddress address, - Service<ThriftClientRequest, byte[]> client, - DistributedLogService.ServiceIface service) { - this.address = address; - this.client = client; - this.service = service; - } - - public SocketAddress getAddress() { - return address; - } - - public Service<ThriftClientRequest, byte[]> getClient() { - return client; - } - - public DistributedLogService.ServiceIface getService() { - return service; - } - - public Future<BoxedUnit> close() { - return client.close(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java deleted file mode 100644 index 17b70be..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java +++ /dev/null @@ -1,362 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.proxy; - -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableMap; -import org.apache.distributedlog.client.ClientConfig; -import org.apache.distributedlog.client.stats.ClientStats; -import org.apache.distributedlog.client.stats.OpStats; -import org.apache.distributedlog.thrift.service.ClientInfo; -import org.apache.distributedlog.thrift.service.ServerInfo; -import com.twitter.util.FutureEventListener; -import java.net.SocketAddress; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Manager manages clients (channels) to proxies. - */ -public class ProxyClientManager implements TimerTask { - - private static final Logger logger = LoggerFactory.getLogger(ProxyClientManager.class); - - private final ClientConfig clientConfig; - private final ProxyClient.Builder clientBuilder; - private final HashedWheelTimer timer; - private final HostProvider hostProvider; - private volatile Timeout periodicHandshakeTask; - private final ConcurrentHashMap<SocketAddress, ProxyClient> address2Services = - new ConcurrentHashMap<SocketAddress, ProxyClient>(); - private final CopyOnWriteArraySet<ProxyListener> proxyListeners = - new CopyOnWriteArraySet<ProxyListener>(); - private volatile boolean closed = false; - private volatile boolean periodicHandshakeEnabled = true; - private final Stopwatch lastOwnershipSyncStopwatch; - - private final OpStats handshakeStats; - - public ProxyClientManager(ClientConfig clientConfig, - ProxyClient.Builder clientBuilder, - HashedWheelTimer timer, - HostProvider hostProvider, - ClientStats clientStats) { - this.clientConfig = clientConfig; - this.clientBuilder = clientBuilder; - this.timer = timer; - this.hostProvider = hostProvider; - this.handshakeStats = clientStats.getOpStats("handshake"); - scheduleHandshake(); - this.lastOwnershipSyncStopwatch = Stopwatch.createStarted(); - } - - private void scheduleHandshake() { - if (clientConfig.getPeriodicHandshakeIntervalMs() > 0) { - periodicHandshakeTask = timer.newTimeout(this, - clientConfig.getPeriodicHandshakeIntervalMs(), TimeUnit.MILLISECONDS); - } - } - - void setPeriodicHandshakeEnabled(boolean enabled) { - this.periodicHandshakeEnabled = enabled; - } - - @Override - public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled() || closed) { - return; - } - if (periodicHandshakeEnabled) { - final boolean syncOwnerships = lastOwnershipSyncStopwatch.elapsed(TimeUnit.MILLISECONDS) - >= clientConfig.getPeriodicOwnershipSyncIntervalMs(); - - final Set<SocketAddress> hostsSnapshot = hostProvider.getHosts(); - final AtomicInteger numHosts = new AtomicInteger(hostsSnapshot.size()); - final AtomicInteger numStreams = new AtomicInteger(0); - final AtomicInteger numSuccesses = new AtomicInteger(0); - final AtomicInteger numFailures = new AtomicInteger(0); - final ConcurrentMap<SocketAddress, Integer> streamDistributions = - new ConcurrentHashMap<SocketAddress, Integer>(); - final Stopwatch stopwatch = Stopwatch.createStarted(); - for (SocketAddress host : hostsSnapshot) { - final SocketAddress address = host; - final ProxyClient client = getClient(address); - handshake(address, client, new FutureEventListener<ServerInfo>() { - @Override - public void onSuccess(ServerInfo serverInfo) { - numStreams.addAndGet(serverInfo.getOwnershipsSize()); - numSuccesses.incrementAndGet(); - notifyHandshakeSuccess(address, client, serverInfo, false, stopwatch); - if (clientConfig.isHandshakeTracingEnabled()) { - streamDistributions.putIfAbsent(address, serverInfo.getOwnershipsSize()); - } - complete(); - } - - @Override - public void onFailure(Throwable cause) { - numFailures.incrementAndGet(); - notifyHandshakeFailure(address, client, cause, stopwatch); - complete(); - } - - private void complete() { - if (0 == numHosts.decrementAndGet()) { - if (syncOwnerships) { - logger.info("Periodic handshaked with {} hosts : {} streams returned," - + " {} hosts succeeded, {} hosts failed", - new Object[] { - hostsSnapshot.size(), - numStreams.get(), - numSuccesses.get(), - numFailures.get()}); - if (clientConfig.isHandshakeTracingEnabled()) { - logger.info("Periodic handshaked stream distribution : {}", streamDistributions); - } - } - } - } - }, false, syncOwnerships); - } - - if (syncOwnerships) { - lastOwnershipSyncStopwatch.reset().start(); - } - } - scheduleHandshake(); - } - - /** - * Register a proxy <code>listener</code> on proxy related changes. - * - * @param listener - * proxy listener - */ - public void registerProxyListener(ProxyListener listener) { - proxyListeners.add(listener); - } - - private void notifyHandshakeSuccess(SocketAddress address, - ProxyClient client, - ServerInfo serverInfo, - boolean logging, - Stopwatch stopwatch) { - if (logging) { - if (null != serverInfo && serverInfo.isSetOwnerships()) { - logger.info("Handshaked with {} : {} ownerships returned.", - address, serverInfo.getOwnerships().size()); - } else { - logger.info("Handshaked with {} : no ownerships returned", address); - } - } - handshakeStats.completeRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1); - for (ProxyListener listener : proxyListeners) { - listener.onHandshakeSuccess(address, client, serverInfo); - } - } - - private void notifyHandshakeFailure(SocketAddress address, - ProxyClient client, - Throwable cause, - Stopwatch stopwatch) { - handshakeStats.failRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1); - for (ProxyListener listener : proxyListeners) { - listener.onHandshakeFailure(address, client, cause); - } - } - - /** - * Retrieve a client to proxy <code>address</code>. - * - * @param address - * proxy address - * @return proxy client - */ - public ProxyClient getClient(final SocketAddress address) { - ProxyClient sc = address2Services.get(address); - if (null != sc) { - return sc; - } - return createClient(address); - } - - /** - * Remove the client to proxy <code>address</code>. - * - * @param address - * proxy address - */ - public void removeClient(SocketAddress address) { - ProxyClient sc = address2Services.remove(address); - if (null != sc) { - logger.info("Removed host {}.", address); - sc.close(); - } - } - - /** - * Remove the client <code>sc</code> to proxy <code>address</code>. - * - * @param address - * proxy address - * @param sc - * proxy client - */ - public void removeClient(SocketAddress address, ProxyClient sc) { - if (address2Services.remove(address, sc)) { - logger.info("Remove client {} to host {}.", sc, address); - sc.close(); - } - } - - /** - * Create a client to proxy <code>address</code>. - * - * @param address - * proxy address - * @return proxy client - */ - public ProxyClient createClient(final SocketAddress address) { - final ProxyClient sc = clientBuilder.build(address); - ProxyClient oldSC = address2Services.putIfAbsent(address, sc); - if (null != oldSC) { - sc.close(); - return oldSC; - } else { - final Stopwatch stopwatch = Stopwatch.createStarted(); - FutureEventListener<ServerInfo> listener = new FutureEventListener<ServerInfo>() { - @Override - public void onSuccess(ServerInfo serverInfo) { - notifyHandshakeSuccess(address, sc, serverInfo, true, stopwatch); - } - @Override - public void onFailure(Throwable cause) { - notifyHandshakeFailure(address, sc, cause, stopwatch); - } - }; - // send a ping messaging after creating connections. - handshake(address, sc, listener, true, true); - return sc; - } - } - - /** - * Handshake with a given proxy. - * - * @param address - * proxy address - * @param sc - * proxy client - * @param listener - * listener on handshake result - */ - private void handshake(SocketAddress address, - ProxyClient sc, - FutureEventListener<ServerInfo> listener, - boolean logging, - boolean getOwnerships) { - if (clientConfig.getHandshakeWithClientInfo()) { - ClientInfo clientInfo = new ClientInfo(); - clientInfo.setGetOwnerships(getOwnerships); - clientInfo.setStreamNameRegex(clientConfig.getStreamNameRegex()); - if (logging) { - logger.info("Handshaking with {} : {}", address, clientInfo); - } - sc.getService().handshakeWithClientInfo(clientInfo) - .addEventListener(listener); - } else { - if (logging) { - logger.info("Handshaking with {}", address); - } - sc.getService().handshake().addEventListener(listener); - } - } - - /** - * Handshake with all proxies. - * - * <p>NOTE: this is a synchronous call. - */ - public void handshake() { - Set<SocketAddress> hostsSnapshot = hostProvider.getHosts(); - logger.info("Handshaking with {} hosts.", hostsSnapshot.size()); - final CountDownLatch latch = new CountDownLatch(hostsSnapshot.size()); - final Stopwatch stopwatch = Stopwatch.createStarted(); - for (SocketAddress host: hostsSnapshot) { - final SocketAddress address = host; - final ProxyClient client = getClient(address); - handshake(address, client, new FutureEventListener<ServerInfo>() { - @Override - public void onSuccess(ServerInfo serverInfo) { - notifyHandshakeSuccess(address, client, serverInfo, true, stopwatch); - latch.countDown(); - } - @Override - public void onFailure(Throwable cause) { - notifyHandshakeFailure(address, client, cause, stopwatch); - latch.countDown(); - } - }, true, true); - } - try { - latch.await(1, TimeUnit.MINUTES); - } catch (InterruptedException e) { - logger.warn("Interrupted on handshaking with servers : ", e); - } - } - - /** - * Return number of proxies managed by client manager. - * - * @return number of proxies managed by client manager. - */ - public int getNumProxies() { - return address2Services.size(); - } - - /** - * Return all clients. - * - * @return all clients. - */ - public Map<SocketAddress, ProxyClient> getAllClients() { - return ImmutableMap.copyOf(address2Services); - } - - public void close() { - closed = true; - Timeout task = periodicHandshakeTask; - if (null != task) { - task.cancel(); - } - for (ProxyClient sc : address2Services.values()) { - sc.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java deleted file mode 100644 index 0a6b076..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.proxy; - -import org.apache.distributedlog.thrift.service.ServerInfo; -import java.net.SocketAddress; - -/** - * Listener on server changes. - */ -public interface ProxyListener { - /** - * When a proxy's server info changed, it would be notified. - * - * @param address - * proxy address - * @param client - * proxy client that executes handshaking - * @param serverInfo - * proxy's server info - */ - void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo); - - /** - * Failed to handshake with a proxy. - * - * @param address - * proxy address - * @param client - * proxy client - * @param cause - * failure reason - */ - void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java deleted file mode 100644 index 4161afb..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Clients that interact with individual proxies. - */ -package org.apache.distributedlog.client.proxy; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java deleted file mode 100644 index 2ac5be3..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.resolver; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Default implementation of {@link RegionResolver}. - */ -public class DefaultRegionResolver implements RegionResolver { - - private static final String DEFAULT_REGION = "default-region"; - - private final Map<SocketAddress, String> regionOverrides = - new HashMap<SocketAddress, String>(); - private final ConcurrentMap<SocketAddress, String> regionMap = - new ConcurrentHashMap<SocketAddress, String>(); - - public DefaultRegionResolver() { - } - - public DefaultRegionResolver(Map<SocketAddress, String> regionOverrides) { - this.regionOverrides.putAll(regionOverrides); - } - - @Override - public String resolveRegion(SocketAddress address) { - String region = regionMap.get(address); - if (null == region) { - region = doResolveRegion(address); - regionMap.put(address, region); - } - return region; - } - - private String doResolveRegion(SocketAddress address) { - String region = regionOverrides.get(address); - if (null != region) { - return region; - } - - String domainName; - if (address instanceof InetSocketAddress) { - InetSocketAddress iAddr = (InetSocketAddress) address; - domainName = iAddr.getHostName(); - } else { - domainName = address.toString(); - } - String[] parts = domainName.split("\\."); - if (parts.length <= 0) { - return DEFAULT_REGION; - } - String hostName = parts[0]; - String[] labels = hostName.split("-"); - if (labels.length != 4) { - return DEFAULT_REGION; - } - return labels[0]; - } - - @Override - public void removeCachedHost(SocketAddress address) { - regionMap.remove(address); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java deleted file mode 100644 index 023799c..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.resolver; - -import java.net.SocketAddress; - -/** - * Resolve address to region. - */ -public interface RegionResolver { - - /** - * Resolve address to region. - * - * @param address - * socket address - * @return region - */ - String resolveRegion(SocketAddress address); - - /** - * Remove cached host. - * - * @param address - * socket address. - */ - void removeCachedHost(SocketAddress address); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java deleted file mode 100644 index 81cda2f..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Resolver to resolve network addresses. - */ -package org.apache.distributedlog.client.resolver; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java deleted file mode 100644 index 666fa31..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java +++ /dev/null @@ -1,500 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.routing; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.MapDifference; -import com.google.common.collect.Maps; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.common.zookeeper.ServerSet; -import org.apache.distributedlog.service.DLSocketAddress; -import com.twitter.finagle.ChannelException; -import com.twitter.finagle.NoBrokersAvailableException; -import com.twitter.finagle.stats.Counter; -import com.twitter.finagle.stats.Gauge; -import com.twitter.finagle.stats.NullStatsReceiver; -import com.twitter.finagle.stats.StatsReceiver; -import com.twitter.util.Function0; -import java.net.SocketAddress; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.tuple.Pair; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.Seq; - -/** - * Consistent Hashing Based {@link RoutingService}. - */ -public class ConsistentHashRoutingService extends ServerSetRoutingService { - - private static final Logger logger = LoggerFactory.getLogger(ConsistentHashRoutingService.class); - - @Deprecated - public static ConsistentHashRoutingService of(ServerSetWatcher serverSetWatcher, int numReplicas) { - return new ConsistentHashRoutingService(serverSetWatcher, numReplicas, 300, NullStatsReceiver.get()); - } - - /** - * Builder helper class to build a consistent hash bashed {@link RoutingService}. - * - * @return builder to build a consistent hash based {@link RoutingService}. - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder for building consistent hash based routing service. - */ - public static class Builder implements RoutingService.Builder { - - private ServerSet serverSet; - private boolean resolveFromName = false; - private int numReplicas; - private int blackoutSeconds = 300; - private StatsReceiver statsReceiver = NullStatsReceiver.get(); - - private Builder() {} - - public Builder serverSet(ServerSet serverSet) { - this.serverSet = serverSet; - return this; - } - - public Builder resolveFromName(boolean enabled) { - this.resolveFromName = enabled; - return this; - } - - public Builder numReplicas(int numReplicas) { - this.numReplicas = numReplicas; - return this; - } - - public Builder blackoutSeconds(int seconds) { - this.blackoutSeconds = seconds; - return this; - } - - public Builder statsReceiver(StatsReceiver statsReceiver) { - this.statsReceiver = statsReceiver; - return this; - } - - @Override - public RoutingService build() { - checkNotNull(serverSet, "No serverset provided."); - checkNotNull(statsReceiver, "No stats receiver provided."); - checkArgument(numReplicas > 0, "Invalid number of replicas : " + numReplicas); - return new ConsistentHashRoutingService(new TwitterServerSetWatcher(serverSet, resolveFromName), - numReplicas, blackoutSeconds, statsReceiver); - } - } - - static class ConsistentHash { - private final HashFunction hashFunction; - private final int numOfReplicas; - private final SortedMap<Long, SocketAddress> circle; - - // Stats - protected final Counter hostAddedCounter; - protected final Counter hostRemovedCounter; - - ConsistentHash(HashFunction hashFunction, - int numOfReplicas, - StatsReceiver statsReceiver) { - this.hashFunction = hashFunction; - this.numOfReplicas = numOfReplicas; - this.circle = new TreeMap<Long, SocketAddress>(); - - this.hostAddedCounter = statsReceiver.counter0("adds"); - this.hostRemovedCounter = statsReceiver.counter0("removes"); - } - - private String replicaName(int shardId, int replica, String address) { - if (shardId < 0) { - shardId = UNKNOWN_SHARD_ID; - } - - StringBuilder sb = new StringBuilder(100); - sb.append("shard-"); - sb.append(shardId); - sb.append('-'); - sb.append(replica); - sb.append('-'); - sb.append(address); - - return sb.toString(); - } - - private Long replicaHash(int shardId, int replica, String address) { - return hashFunction.hashUnencodedChars(replicaName(shardId, replica, address)).asLong(); - } - - private Long replicaHash(int shardId, int replica, SocketAddress address) { - return replicaHash(shardId, replica, address.toString()); - } - - public synchronized void add(int shardId, SocketAddress address) { - String addressStr = address.toString(); - for (int i = 0; i < numOfReplicas; i++) { - Long hash = replicaHash(shardId, i, addressStr); - circle.put(hash, address); - } - hostAddedCounter.incr(); - } - - public synchronized void remove(int shardId, SocketAddress address) { - for (int i = 0; i < numOfReplicas; i++) { - long hash = replicaHash(shardId, i, address); - SocketAddress oldAddress = circle.get(hash); - if (null != oldAddress && oldAddress.equals(address)) { - circle.remove(hash); - } - } - hostRemovedCounter.incr(); - } - - public SocketAddress get(String key, RoutingContext rContext) { - long hash = hashFunction.hashUnencodedChars(key).asLong(); - return find(hash, rContext); - } - - private synchronized SocketAddress find(long hash, RoutingContext rContext) { - if (circle.isEmpty()) { - return null; - } - - Iterator<Map.Entry<Long, SocketAddress>> iterator = - circle.tailMap(hash).entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<Long, SocketAddress> entry = iterator.next(); - if (!rContext.isTriedHost(entry.getValue())) { - return entry.getValue(); - } - } - // the tail map has been checked - iterator = circle.headMap(hash).entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<Long, SocketAddress> entry = iterator.next(); - if (!rContext.isTriedHost(entry.getValue())) { - return entry.getValue(); - } - } - - return null; - } - - private synchronized Pair<Long, SocketAddress> get(long hash) { - if (circle.isEmpty()) { - return null; - } - - if (!circle.containsKey(hash)) { - SortedMap<Long, SocketAddress> tailMap = circle.tailMap(hash); - hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); - } - return Pair.of(hash, circle.get(hash)); - } - - synchronized void dumpHashRing() { - for (Map.Entry<Long, SocketAddress> entry : circle.entrySet()) { - logger.info(entry.getKey() + " : " + entry.getValue()); - } - } - - } - - class BlackoutHost implements TimerTask { - final int shardId; - final SocketAddress address; - - BlackoutHost(int shardId, SocketAddress address) { - this.shardId = shardId; - this.address = address; - numBlackoutHosts.incrementAndGet(); - } - - @Override - public void run(Timeout timeout) throws Exception { - numBlackoutHosts.decrementAndGet(); - if (!timeout.isExpired()) { - return; - } - Set<SocketAddress> removedList = new HashSet<SocketAddress>(); - boolean joined; - // add the shard back - synchronized (shardId2Address) { - SocketAddress curHost = shardId2Address.get(shardId); - if (null != curHost) { - // there is already new shard joint, so drop the host. - logger.info("Blackout Shard {} ({}) was already replaced by {} permanently.", - new Object[] { shardId, address, curHost }); - joined = false; - } else { - join(shardId, address, removedList); - joined = true; - } - } - if (joined) { - for (RoutingListener listener : listeners) { - listener.onServerJoin(address); - } - } else { - for (RoutingListener listener : listeners) { - listener.onServerLeft(address); - } - } - } - } - - protected final HashedWheelTimer hashedWheelTimer; - protected final HashFunction hashFunction = Hashing.md5(); - protected final ConsistentHash circle; - protected final Map<Integer, SocketAddress> shardId2Address = - new HashMap<Integer, SocketAddress>(); - protected final Map<SocketAddress, Integer> address2ShardId = - new HashMap<SocketAddress, Integer>(); - - // blackout period - protected final int blackoutSeconds; - - // stats - protected final StatsReceiver statsReceiver; - protected final AtomicInteger numBlackoutHosts; - protected final Gauge numBlackoutHostsGauge; - protected final Gauge numHostsGauge; - - private static final int UNKNOWN_SHARD_ID = -1; - - ConsistentHashRoutingService(ServerSetWatcher serverSetWatcher, - int numReplicas, - int blackoutSeconds, - StatsReceiver statsReceiver) { - super(serverSetWatcher); - this.circle = new ConsistentHash(hashFunction, numReplicas, statsReceiver.scope("ring")); - this.hashedWheelTimer = new HashedWheelTimer(new ThreadFactoryBuilder() - .setNameFormat("ConsistentHashRoutingService-Timer-%d").build()); - this.blackoutSeconds = blackoutSeconds; - // stats - this.statsReceiver = statsReceiver; - this.numBlackoutHosts = new AtomicInteger(0); - this.numBlackoutHostsGauge = this.statsReceiver.addGauge(gaugeName("num_blackout_hosts"), - new Function0<Object>() { - @Override - public Object apply() { - return (float) numBlackoutHosts.get(); - } - }); - this.numHostsGauge = this.statsReceiver.addGauge(gaugeName("num_hosts"), - new Function0<Object>() { - @Override - public Object apply() { - return (float) address2ShardId.size(); - } - }); - } - - private static Seq<String> gaugeName(String name) { - return scala.collection.JavaConversions.asScalaBuffer(Arrays.asList(name)).toList(); - } - - @Override - public void startService() { - super.startService(); - this.hashedWheelTimer.start(); - } - - @Override - public void stopService() { - this.hashedWheelTimer.stop(); - super.stopService(); - } - - @Override - public Set<SocketAddress> getHosts() { - synchronized (shardId2Address) { - return ImmutableSet.copyOf(address2ShardId.keySet()); - } - } - - @Override - public SocketAddress getHost(String key, RoutingContext rContext) - throws NoBrokersAvailableException { - SocketAddress host = circle.get(key, rContext); - if (null != host) { - return host; - } - throw new NoBrokersAvailableException("No host found for " + key + ", routing context : " + rContext); - } - - @Override - public void removeHost(SocketAddress host, Throwable reason) { - removeHostInternal(host, Optional.of(reason)); - } - - private void removeHostInternal(SocketAddress host, Optional<Throwable> reason) { - synchronized (shardId2Address) { - Integer shardId = address2ShardId.remove(host); - if (null != shardId) { - SocketAddress curHost = shardId2Address.get(shardId); - if (null != curHost && curHost.equals(host)) { - shardId2Address.remove(shardId); - } - circle.remove(shardId, host); - if (reason.isPresent()) { - if (reason.get() instanceof ChannelException) { - logger.info("Shard {} ({}) left due to ChannelException, black it out for {} seconds" - + " (message = {})", - new Object[] { shardId, host, blackoutSeconds, reason.get().toString() }); - BlackoutHost blackoutHost = new BlackoutHost(shardId, host); - hashedWheelTimer.newTimeout(blackoutHost, blackoutSeconds, TimeUnit.SECONDS); - } else { - logger.info("Shard {} ({}) left due to exception {}", - new Object[] { shardId, host, reason.get().toString() }); - } - } else { - logger.info("Shard {} ({}) left after server set change", - shardId, host); - } - } else if (reason.isPresent()) { - logger.info("Node {} left due to exception {}", host, reason.get().toString()); - } else { - logger.info("Node {} left after server set change", host); - } - } - } - - /** - * The caller should synchronize on <i>shardId2Address</i>. - * @param shardId - * Shard id of new host joined. - * @param newHost - * New host joined. - * @param removedList - * Old hosts to remove - */ - private void join(int shardId, SocketAddress newHost, Set<SocketAddress> removedList) { - SocketAddress oldHost = shardId2Address.put(shardId, newHost); - if (null != oldHost) { - // remove the old host only when a new shard is kicked in to replace it. - address2ShardId.remove(oldHost); - circle.remove(shardId, oldHost); - removedList.add(oldHost); - logger.info("Shard {} ({}) left permanently.", shardId, oldHost); - } - address2ShardId.put(newHost, shardId); - circle.add(shardId, newHost); - logger.info("Shard {} ({}) joined to replace ({}).", - new Object[] { shardId, newHost, oldHost }); - } - - @Override - protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serviceInstances) { - Set<SocketAddress> joinedList = new HashSet<SocketAddress>(); - Set<SocketAddress> removedList = new HashSet<SocketAddress>(); - - Map<Integer, SocketAddress> newMap = new HashMap<Integer, SocketAddress>(); - synchronized (shardId2Address) { - for (DLSocketAddress serviceInstance : serviceInstances) { - if (serviceInstance.getShard() >= 0) { - newMap.put(serviceInstance.getShard(), serviceInstance.getSocketAddress()); - } else { - Integer shard = address2ShardId.get(serviceInstance.getSocketAddress()); - if (null == shard) { - // Assign a random negative shardId - int shardId; - do { - shardId = Math.min(-1 , (int) (Math.random() * Integer.MIN_VALUE)); - } while (null != shardId2Address.get(shardId)); - shard = shardId; - } - newMap.put(shard, serviceInstance.getSocketAddress()); - } - } - } - - Map<Integer, SocketAddress> left; - synchronized (shardId2Address) { - MapDifference<Integer, SocketAddress> difference = - Maps.difference(shardId2Address, newMap); - left = difference.entriesOnlyOnLeft(); - for (Map.Entry<Integer, SocketAddress> shardEntry : left.entrySet()) { - int shard = shardEntry.getKey(); - if (shard >= 0) { - SocketAddress host = shardId2Address.get(shard); - if (null != host) { - // we don't remove those hosts that just disappered on serverset proactively, - // since it might be just because serverset become flaky - // address2ShardId.remove(host); - // circle.remove(shard, host); - logger.info("Shard {} ({}) left temporarily.", shard, host); - } - } else { - // shard id is negative - they are resolved from finagle name, which instances don't have shard id - // in this case, if they are removed from serverset, we removed them directly - SocketAddress host = shardEntry.getValue(); - if (null != host) { - removeHostInternal(host, Optional.<Throwable>absent()); - removedList.add(host); - } - } - } - // we need to find if any shards are replacing old shards - for (Map.Entry<Integer, SocketAddress> shard : newMap.entrySet()) { - SocketAddress oldHost = shardId2Address.get(shard.getKey()); - SocketAddress newHost = shard.getValue(); - if (!newHost.equals(oldHost)) { - join(shard.getKey(), newHost, removedList); - joinedList.add(newHost); - } - } - } - - for (SocketAddress addr : removedList) { - for (RoutingListener listener : listeners) { - listener.onServerLeft(addr); - } - } - - for (SocketAddress addr : joinedList) { - for (RoutingListener listener : listeners) { - listener.onServerJoin(addr); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java deleted file mode 100644 index e51eb1e..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.routing; - -import com.google.common.collect.ImmutableSet; -import com.twitter.common.base.Command; -import com.twitter.common.base.Commands; -import com.twitter.common.zookeeper.Group; -import com.twitter.common.zookeeper.ServerSet; -import com.twitter.finagle.Addr; -import com.twitter.finagle.Address; -import com.twitter.finagle.Name; -import com.twitter.finagle.Resolver$; -import com.twitter.thrift.Endpoint; -import com.twitter.thrift.ServiceInstance; -import com.twitter.thrift.Status; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -/** - * Finagle Name based {@link ServerSet} implementation. - */ -class NameServerSet implements ServerSet { - - private static final Logger logger = LoggerFactory.getLogger(NameServerSet.class); - - private volatile Set<HostChangeMonitor<ServiceInstance>> watchers = - new HashSet<HostChangeMonitor<ServiceInstance>>(); - private volatile ImmutableSet<ServiceInstance> hostSet = ImmutableSet.of(); - private AtomicBoolean resolutionPending = new AtomicBoolean(true); - - public NameServerSet(String nameStr) { - Name name; - try { - name = Resolver$.MODULE$.eval(nameStr); - } catch (Exception exc) { - logger.error("Exception in Resolver.eval for name {}", nameStr, exc); - // Since this is called from various places that dont handle specific exceptions, - // we have no option than to throw a runtime exception to halt the control flow - // This should only happen in case of incorrect configuration. Having a log message - // would help identify the problem during tests - throw new RuntimeException(exc); - } - initialize(name); - } - - public NameServerSet(Name name) { - initialize(name); - } - - private void initialize(Name name) { - if (name instanceof TestName) { - ((TestName) name).changes(new AbstractFunction1<Addr, BoxedUnit>() { - @Override - public BoxedUnit apply(Addr varAddr) { - return NameServerSet.this.respondToChanges(varAddr); - } - }); - } else if (name instanceof Name.Bound) { - ((Name.Bound) name).addr().changes().respond(new AbstractFunction1<Addr, BoxedUnit>() { - @Override - public BoxedUnit apply(Addr varAddr) { - return NameServerSet.this.respondToChanges(varAddr); - } - }); - } else { - logger.error("NameServerSet only supports Name.Bound. While the resolved name {} was {}", - name, name.getClass()); - throw new UnsupportedOperationException("NameServerSet only supports Name.Bound"); - } - } - - private ServiceInstance endpointAddressToServiceInstance(Address endpointAddress) { - if (endpointAddress instanceof Address.Inet) { - InetSocketAddress inetSocketAddress = ((Address.Inet) endpointAddress).addr(); - Endpoint endpoint = new Endpoint(inetSocketAddress.getHostString(), inetSocketAddress.getPort()); - HashMap<String, Endpoint> map = new HashMap<String, Endpoint>(); - map.put("thrift", endpoint); - return new ServiceInstance( - endpoint, - map, - Status.ALIVE); - } else { - logger.error("We expect InetSocketAddress while the resolved address {} was {}", - endpointAddress, endpointAddress.getClass()); - throw new UnsupportedOperationException("invalid endpoint address: " + endpointAddress); - } - } - - - private BoxedUnit respondToChanges(Addr addr) { - ImmutableSet<ServiceInstance> oldHostSet = ImmutableSet.copyOf(hostSet); - - ImmutableSet<ServiceInstance> newHostSet = oldHostSet; - - if (addr instanceof Addr.Bound) { - scala.collection.immutable.Set<Address> endpointAddresses = ((Addr.Bound) addr).addrs(); - scala.collection.Iterator<Address> endpointAddressesIterator = endpointAddresses.toIterator(); - HashSet<ServiceInstance> serviceInstances = new HashSet<ServiceInstance>(); - while (endpointAddressesIterator.hasNext()) { - serviceInstances.add(endpointAddressToServiceInstance(endpointAddressesIterator.next())); - } - newHostSet = ImmutableSet.copyOf(serviceInstances); - - } else if (addr instanceof Addr.Failed) { - logger.error("Name resolution failed", ((Addr.Failed) addr).cause()); - newHostSet = ImmutableSet.of(); - } else if (addr.toString().equals("Pending")) { - logger.info("Name resolution pending"); - newHostSet = oldHostSet; - } else if (addr.toString().equals("Neg")) { - newHostSet = ImmutableSet.of(); - } else { - logger.error("Invalid Addr type: {}", addr.getClass().getName()); - throw new UnsupportedOperationException("Invalid Addr type:" + addr.getClass().getName()); - } - - // Reference comparison is valid as the sets are immutable - if (oldHostSet != newHostSet) { - logger.info("NameServerSet updated: {} -> {}", hostSetToString(oldHostSet), hostSetToString(newHostSet)); - resolutionPending.set(false); - hostSet = newHostSet; - synchronized (watchers) { - for (HostChangeMonitor<ServiceInstance> watcher: watchers) { - watcher.onChange(newHostSet); - } - } - - } - - return BoxedUnit.UNIT; - } - - - private String hostSetToString(ImmutableSet<ServiceInstance> hostSet) { - StringBuilder result = new StringBuilder(); - result.append("("); - for (ServiceInstance serviceInstance : hostSet) { - Endpoint endpoint = serviceInstance.getServiceEndpoint(); - result.append(String.format(" %s:%d", endpoint.getHost(), endpoint.getPort())); - } - result.append(" )"); - - return result.toString(); - } - - - /** - * Attempts to join a server set for this logical service group. - * - * @param endpoint the primary service endpoint - * @param additionalEndpoints and additional endpoints keyed by their logical name - * @param status the current service status - * @return an EndpointStatus object that allows the endpoint to adjust its status - * @throws Group.JoinException if there was a problem joining the server set - * @throws InterruptedException if interrupted while waiting to join the server set - * @deprecated The status field is deprecated. Please use {@link #join(java.net.InetSocketAddress, java.util.Map)} - */ - @Override - public EndpointStatus join(InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - Status status) - throws Group.JoinException, InterruptedException { - throw new UnsupportedOperationException("NameServerSet does not support join"); - } - - /** - * Attempts to join a server set for this logical service group. - * - * @param endpoint the primary service endpoint - * @param additionalEndpoints and additional endpoints keyed by their logical name - * @return an EndpointStatus object that allows the endpoint to adjust its status - * @throws Group.JoinException if there was a problem joining the server set - * @throws InterruptedException if interrupted while waiting to join the server set - */ - @Override - public EndpointStatus join(InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints) - throws Group.JoinException, InterruptedException { - throw new UnsupportedOperationException("NameServerSet does not support join"); - } - - /** - * Attempts to join a server set for this logical service group. - * - * @param endpoint the primary service endpoint - * @param additionalEndpoints and additional endpoints keyed by their logical name - * @param shardId Unique shard identifier for this member of the service. - * @return an EndpointStatus object that allows the endpoint to adjust its status - * @throws Group.JoinException if there was a problem joining the server set - * @throws InterruptedException if interrupted while waiting to join the server set - */ - @Override - public EndpointStatus join(InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - int shardId) - throws Group.JoinException, InterruptedException { - throw new UnsupportedOperationException("NameServerSet does not support join"); - } - - /** - * Registers a monitor to receive change notices for this server set as long as this jvm process - * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. - * The monitor will be notified if the membership set or parameters of existing members have - * changed. - * - * @param monitor the server set monitor to call back when the host set changes - * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set - * @deprecated Deprecated in favor of {@link #watch(com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor)} - */ - @Deprecated - @Override - public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { - throw new UnsupportedOperationException("NameServerSet does not support monitor"); - } - - /** - * Registers a monitor to receive change notices for this server set as long as this jvm process - * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. - * The monitor will be notified if the membership set or parameters of existing members have - * changed. - * - * @param monitor the server set monitor to call back when the host set changes - * @return A command which, when executed, will stop monitoring the host set. - * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set - */ - @Override - public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { - // First add the monitor to the watchers so that it does not miss any changes and invoke - // the onChange method - synchronized (watchers) { - watchers.add(monitor); - } - - if (resolutionPending.compareAndSet(false, false)) { - monitor.onChange(hostSet); - } - - return Commands.NOOP; // Return value is not used - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java deleted file mode 100644 index d71cee3..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.routing; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Sets; -import org.apache.distributedlog.client.resolver.RegionResolver; -import com.twitter.finagle.NoBrokersAvailableException; -import com.twitter.finagle.stats.NullStatsReceiver; -import com.twitter.finagle.stats.StatsReceiver; -import java.net.SocketAddress; -import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Chain multiple routing services. - */ -public class RegionsRoutingService implements RoutingService { - - private static final Logger logger = LoggerFactory.getLogger(RegionsRoutingService.class); - - /** - * Create a multiple regions routing services based on a list of region routing {@code services}. - * - * <p>It is deprecated. Please use {@link Builder} to build multiple regions routing service. - * - * @param regionResolver region resolver - * @param services a list of region routing services. - * @return multiple regions routing service - * @see Builder - */ - @Deprecated - public static RegionsRoutingService of(RegionResolver regionResolver, - RoutingService...services) { - return new RegionsRoutingService(regionResolver, services); - } - - /** - * Create a builder to build a multiple-regions routing service. - * - * @return builder to build a multiple-regions routing service. - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder to build a multiple-regions routing service. - */ - public static class Builder implements RoutingService.Builder { - - private RegionResolver resolver; - private RoutingService.Builder[] routingServiceBuilders; - private StatsReceiver statsReceiver = NullStatsReceiver.get(); - - private Builder() {} - - public Builder routingServiceBuilders(RoutingService.Builder...builders) { - this.routingServiceBuilders = builders; - return this; - } - - public Builder resolver(RegionResolver regionResolver) { - this.resolver = regionResolver; - return this; - } - - @Override - public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) { - this.statsReceiver = statsReceiver; - return this; - } - - @Override - public RegionsRoutingService build() { - checkNotNull(routingServiceBuilders, "No routing service builder provided."); - checkNotNull(resolver, "No region resolver provided."); - checkNotNull(statsReceiver, "No stats receiver provided"); - RoutingService[] services = new RoutingService[routingServiceBuilders.length]; - for (int i = 0; i < services.length; i++) { - String statsScope; - if (0 == i) { - statsScope = "local"; - } else { - statsScope = "remote_" + i; - } - services[i] = routingServiceBuilders[i] - .statsReceiver(statsReceiver.scope(statsScope)) - .build(); - } - return new RegionsRoutingService(resolver, services); - } - } - - protected final RegionResolver regionResolver; - protected final RoutingService[] routingServices; - - private RegionsRoutingService(RegionResolver resolver, - RoutingService[] routingServices) { - this.regionResolver = resolver; - this.routingServices = routingServices; - } - - @Override - public Set<SocketAddress> getHosts() { - Set<SocketAddress> hosts = Sets.newHashSet(); - for (RoutingService rs : routingServices) { - hosts.addAll(rs.getHosts()); - } - return hosts; - } - - @Override - public void startService() { - for (RoutingService service : routingServices) { - service.startService(); - } - logger.info("Regions Routing Service Started"); - } - - @Override - public void stopService() { - for (RoutingService service : routingServices) { - service.stopService(); - } - logger.info("Regions Routing Service Stopped"); - } - - @Override - public RoutingService registerListener(RoutingListener listener) { - for (RoutingService service : routingServices) { - service.registerListener(listener); - } - return this; - } - - @Override - public RoutingService unregisterListener(RoutingListener listener) { - for (RoutingService service : routingServices) { - service.registerListener(listener); - } - return this; - } - - @Override - public SocketAddress getHost(String key, RoutingContext routingContext) - throws NoBrokersAvailableException { - for (RoutingService service : routingServices) { - try { - SocketAddress addr = service.getHost(key, routingContext); - if (routingContext.hasUnavailableRegions()) { - // current region is unavailable - String region = regionResolver.resolveRegion(addr); - if (routingContext.isUnavailableRegion(region)) { - continue; - } - } - if (!routingContext.isTriedHost(addr)) { - return addr; - } - } catch (NoBrokersAvailableException nbae) { - // if there isn't broker available in current service, try next service. - logger.debug("No brokers available in region {} : ", service, nbae); - } - } - throw new NoBrokersAvailableException("No host found for " + key + ", routing context : " + routingContext); - } - - @Override - public void removeHost(SocketAddress address, Throwable reason) { - for (RoutingService service : routingServices) { - service.removeHost(address, reason); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java deleted file mode 100644 index ad73c17..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.routing; - -import org.apache.distributedlog.client.resolver.RegionResolver; -import org.apache.distributedlog.thrift.service.StatusCode; -import com.twitter.finagle.NoBrokersAvailableException; -import com.twitter.finagle.stats.StatsReceiver; -import java.net.SocketAddress; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Routing Service provides mechanism how to route requests. - */ -public interface RoutingService { - - /** - * Builder to build routing service. - */ - interface Builder { - - /** - * Build routing service with stats receiver. - * - * @param statsReceiver - * stats receiver - * @return built routing service - */ - Builder statsReceiver(StatsReceiver statsReceiver); - - /** - * Build the routing service. - * - * @return built routing service - */ - RoutingService build(); - - } - - /** - * Listener for server changes on routing service. - */ - interface RoutingListener { - /** - * Trigger when server left. - * - * @param address left server. - */ - void onServerLeft(SocketAddress address); - - /** - * Trigger when server joint. - * - * @param address joint server. - */ - void onServerJoin(SocketAddress address); - } - - /** - * Routing Context of a request. - */ - class RoutingContext { - - public static RoutingContext of(RegionResolver resolver) { - return new RoutingContext(resolver); - } - - final RegionResolver regionResolver; - final Map<SocketAddress, StatusCode> triedHosts; - final Set<String> unavailableRegions; - - private RoutingContext(RegionResolver regionResolver) { - this.regionResolver = regionResolver; - this.triedHosts = new HashMap<SocketAddress, StatusCode>(); - this.unavailableRegions = new HashSet<String>(); - } - - @Override - public synchronized String toString() { - return "(tried hosts=" + triedHosts + ")"; - } - - /** - * Add tried host to routing context. - * - * @param socketAddress - * socket address of tried host. - * @param code - * status code returned from tried host. - * @return routing context. - */ - public synchronized RoutingContext addTriedHost(SocketAddress socketAddress, StatusCode code) { - this.triedHosts.put(socketAddress, code); - if (StatusCode.REGION_UNAVAILABLE == code) { - unavailableRegions.add(regionResolver.resolveRegion(socketAddress)); - } - return this; - } - - /** - * Is the host <i>address</i> already tried. - * - * @param address - * socket address to check - * @return true if the address is already tried, otherwise false. - */ - public synchronized boolean isTriedHost(SocketAddress address) { - return this.triedHosts.containsKey(address); - } - - /** - * Whether encountered unavailable regions. - * - * @return true if encountered unavailable regions, otherwise false. - */ - public synchronized boolean hasUnavailableRegions() { - return !unavailableRegions.isEmpty(); - } - - /** - * Whether the <i>region</i> is unavailable. - * - * @param region - * region - * @return true if the region is unavailable, otherwise false. - */ - public synchronized boolean isUnavailableRegion(String region) { - return unavailableRegions.contains(region); - } - - } - - /** - * Start routing service. - */ - void startService(); - - /** - * Stop routing service. - */ - void stopService(); - - /** - * Register routing listener. - * - * @param listener routing listener. - * @return routing service. - */ - RoutingService registerListener(RoutingListener listener); - - /** - * Unregister routing listener. - * - * @param listener routing listener. - * @return routing service. - */ - RoutingService unregisterListener(RoutingListener listener); - - /** - * Get all the hosts that available in routing service. - * - * @return all the hosts - */ - Set<SocketAddress> getHosts(); - - /** - * Get the host to route the request by <i>key</i>. - * - * @param key - * key to route the request. - * @param rContext - * routing context. - * @return host to route the request - * @throws NoBrokersAvailableException - */ - SocketAddress getHost(String key, RoutingContext rContext) - throws NoBrokersAvailableException; - - /** - * Remove the host <i>address</i> for a specific <i>reason</i>. - * - * @param address - * host address to remove - * @param reason - * reason to remove the host - */ - void removeHost(SocketAddress address, Throwable reason); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java deleted file mode 100644 index 4ac22ce..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.routing; - -import com.twitter.finagle.stats.StatsReceiver; - -class RoutingServiceProvider implements RoutingService.Builder { - - final RoutingService routingService; - - RoutingServiceProvider(RoutingService routingService) { - this.routingService = routingService; - } - - @Override - public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) { - return this; - } - - @Override - public RoutingService build() { - return routingService; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java deleted file mode 100644 index 8e8edd3..0000000 --- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.client.routing; - -import com.twitter.common.zookeeper.ServerSet; -import java.net.SocketAddress; - -/** - * Utils for routing services. - */ -public class RoutingUtils { - - private static final int NUM_CONSISTENT_HASH_REPLICAS = 997; - - /** - * Building routing service from <code>finagleNameStr</code>. - * - * @param finagleNameStr - * finagle name str of a service - * @return routing service builder - */ - public static RoutingService.Builder buildRoutingService(String finagleNameStr) { - if (!finagleNameStr.startsWith("serverset!") - && !finagleNameStr.startsWith("inet!") - && !finagleNameStr.startsWith("zk!")) { - // We only support serverset based names at the moment - throw new UnsupportedOperationException("Finagle Name format not supported for name: " + finagleNameStr); - } - return buildRoutingService(new NameServerSet(finagleNameStr), true); - } - - /** - * Building routing service from <code>serverSet</code>. - * - * @param serverSet - * server set of a service - * @return routing service builder - */ - public static RoutingService.Builder buildRoutingService(ServerSet serverSet) { - return buildRoutingService(serverSet, false); - } - - /** - * Building routing service from <code>address</code>. - * - * @param address - * host to route the requests - * @return routing service builder - */ - public static RoutingService.Builder buildRoutingService(SocketAddress address) { - return SingleHostRoutingService.newBuilder().address(address); - } - - /** - * Build routing service builder of a routing service <code>routingService</code>. - * - * @param routingService - * routing service to provide - * @return routing service builder - */ - public static RoutingService.Builder buildRoutingService(RoutingService routingService) { - return new RoutingServiceProvider(routingService); - } - - private static RoutingService.Builder buildRoutingService(ServerSet serverSet, - boolean resolveFromName) { - return ConsistentHashRoutingService.newBuilder() - .serverSet(serverSet) - .resolveFromName(resolveFromName) - .numReplicas(NUM_CONSISTENT_HASH_REPLICAS); - } - -}