http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java new file mode 100644 index 0000000..666fa31 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java @@ -0,0 +1,500 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.twitter.common.zookeeper.ServerSet; +import org.apache.distributedlog.service.DLSocketAddress; +import com.twitter.finagle.ChannelException; +import com.twitter.finagle.NoBrokersAvailableException; +import com.twitter.finagle.stats.Counter; +import com.twitter.finagle.stats.Gauge; +import com.twitter.finagle.stats.NullStatsReceiver; +import com.twitter.finagle.stats.StatsReceiver; +import com.twitter.util.Function0; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Seq; + +/** + * Consistent Hashing Based {@link RoutingService}. + */ +public class ConsistentHashRoutingService extends ServerSetRoutingService { + + private static final Logger logger = LoggerFactory.getLogger(ConsistentHashRoutingService.class); + + @Deprecated + public static ConsistentHashRoutingService of(ServerSetWatcher serverSetWatcher, int numReplicas) { + return new ConsistentHashRoutingService(serverSetWatcher, numReplicas, 300, NullStatsReceiver.get()); + } + + /** + * Builder helper class to build a consistent hash bashed {@link RoutingService}. + * + * @return builder to build a consistent hash based {@link RoutingService}. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for building consistent hash based routing service. + */ + public static class Builder implements RoutingService.Builder { + + private ServerSet serverSet; + private boolean resolveFromName = false; + private int numReplicas; + private int blackoutSeconds = 300; + private StatsReceiver statsReceiver = NullStatsReceiver.get(); + + private Builder() {} + + public Builder serverSet(ServerSet serverSet) { + this.serverSet = serverSet; + return this; + } + + public Builder resolveFromName(boolean enabled) { + this.resolveFromName = enabled; + return this; + } + + public Builder numReplicas(int numReplicas) { + this.numReplicas = numReplicas; + return this; + } + + public Builder blackoutSeconds(int seconds) { + this.blackoutSeconds = seconds; + return this; + } + + public Builder statsReceiver(StatsReceiver statsReceiver) { + this.statsReceiver = statsReceiver; + return this; + } + + @Override + public RoutingService build() { + checkNotNull(serverSet, "No serverset provided."); + checkNotNull(statsReceiver, "No stats receiver provided."); + checkArgument(numReplicas > 0, "Invalid number of replicas : " + numReplicas); + return new ConsistentHashRoutingService(new TwitterServerSetWatcher(serverSet, resolveFromName), + numReplicas, blackoutSeconds, statsReceiver); + } + } + + static class ConsistentHash { + private final HashFunction hashFunction; + private final int numOfReplicas; + private final SortedMap<Long, SocketAddress> circle; + + // Stats + protected final Counter hostAddedCounter; + protected final Counter hostRemovedCounter; + + ConsistentHash(HashFunction hashFunction, + int numOfReplicas, + StatsReceiver statsReceiver) { + this.hashFunction = hashFunction; + this.numOfReplicas = numOfReplicas; + this.circle = new TreeMap<Long, SocketAddress>(); + + this.hostAddedCounter = statsReceiver.counter0("adds"); + this.hostRemovedCounter = statsReceiver.counter0("removes"); + } + + private String replicaName(int shardId, int replica, String address) { + if (shardId < 0) { + shardId = UNKNOWN_SHARD_ID; + } + + StringBuilder sb = new StringBuilder(100); + sb.append("shard-"); + sb.append(shardId); + sb.append('-'); + sb.append(replica); + sb.append('-'); + sb.append(address); + + return sb.toString(); + } + + private Long replicaHash(int shardId, int replica, String address) { + return hashFunction.hashUnencodedChars(replicaName(shardId, replica, address)).asLong(); + } + + private Long replicaHash(int shardId, int replica, SocketAddress address) { + return replicaHash(shardId, replica, address.toString()); + } + + public synchronized void add(int shardId, SocketAddress address) { + String addressStr = address.toString(); + for (int i = 0; i < numOfReplicas; i++) { + Long hash = replicaHash(shardId, i, addressStr); + circle.put(hash, address); + } + hostAddedCounter.incr(); + } + + public synchronized void remove(int shardId, SocketAddress address) { + for (int i = 0; i < numOfReplicas; i++) { + long hash = replicaHash(shardId, i, address); + SocketAddress oldAddress = circle.get(hash); + if (null != oldAddress && oldAddress.equals(address)) { + circle.remove(hash); + } + } + hostRemovedCounter.incr(); + } + + public SocketAddress get(String key, RoutingContext rContext) { + long hash = hashFunction.hashUnencodedChars(key).asLong(); + return find(hash, rContext); + } + + private synchronized SocketAddress find(long hash, RoutingContext rContext) { + if (circle.isEmpty()) { + return null; + } + + Iterator<Map.Entry<Long, SocketAddress>> iterator = + circle.tailMap(hash).entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, SocketAddress> entry = iterator.next(); + if (!rContext.isTriedHost(entry.getValue())) { + return entry.getValue(); + } + } + // the tail map has been checked + iterator = circle.headMap(hash).entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, SocketAddress> entry = iterator.next(); + if (!rContext.isTriedHost(entry.getValue())) { + return entry.getValue(); + } + } + + return null; + } + + private synchronized Pair<Long, SocketAddress> get(long hash) { + if (circle.isEmpty()) { + return null; + } + + if (!circle.containsKey(hash)) { + SortedMap<Long, SocketAddress> tailMap = circle.tailMap(hash); + hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); + } + return Pair.of(hash, circle.get(hash)); + } + + synchronized void dumpHashRing() { + for (Map.Entry<Long, SocketAddress> entry : circle.entrySet()) { + logger.info(entry.getKey() + " : " + entry.getValue()); + } + } + + } + + class BlackoutHost implements TimerTask { + final int shardId; + final SocketAddress address; + + BlackoutHost(int shardId, SocketAddress address) { + this.shardId = shardId; + this.address = address; + numBlackoutHosts.incrementAndGet(); + } + + @Override + public void run(Timeout timeout) throws Exception { + numBlackoutHosts.decrementAndGet(); + if (!timeout.isExpired()) { + return; + } + Set<SocketAddress> removedList = new HashSet<SocketAddress>(); + boolean joined; + // add the shard back + synchronized (shardId2Address) { + SocketAddress curHost = shardId2Address.get(shardId); + if (null != curHost) { + // there is already new shard joint, so drop the host. + logger.info("Blackout Shard {} ({}) was already replaced by {} permanently.", + new Object[] { shardId, address, curHost }); + joined = false; + } else { + join(shardId, address, removedList); + joined = true; + } + } + if (joined) { + for (RoutingListener listener : listeners) { + listener.onServerJoin(address); + } + } else { + for (RoutingListener listener : listeners) { + listener.onServerLeft(address); + } + } + } + } + + protected final HashedWheelTimer hashedWheelTimer; + protected final HashFunction hashFunction = Hashing.md5(); + protected final ConsistentHash circle; + protected final Map<Integer, SocketAddress> shardId2Address = + new HashMap<Integer, SocketAddress>(); + protected final Map<SocketAddress, Integer> address2ShardId = + new HashMap<SocketAddress, Integer>(); + + // blackout period + protected final int blackoutSeconds; + + // stats + protected final StatsReceiver statsReceiver; + protected final AtomicInteger numBlackoutHosts; + protected final Gauge numBlackoutHostsGauge; + protected final Gauge numHostsGauge; + + private static final int UNKNOWN_SHARD_ID = -1; + + ConsistentHashRoutingService(ServerSetWatcher serverSetWatcher, + int numReplicas, + int blackoutSeconds, + StatsReceiver statsReceiver) { + super(serverSetWatcher); + this.circle = new ConsistentHash(hashFunction, numReplicas, statsReceiver.scope("ring")); + this.hashedWheelTimer = new HashedWheelTimer(new ThreadFactoryBuilder() + .setNameFormat("ConsistentHashRoutingService-Timer-%d").build()); + this.blackoutSeconds = blackoutSeconds; + // stats + this.statsReceiver = statsReceiver; + this.numBlackoutHosts = new AtomicInteger(0); + this.numBlackoutHostsGauge = this.statsReceiver.addGauge(gaugeName("num_blackout_hosts"), + new Function0<Object>() { + @Override + public Object apply() { + return (float) numBlackoutHosts.get(); + } + }); + this.numHostsGauge = this.statsReceiver.addGauge(gaugeName("num_hosts"), + new Function0<Object>() { + @Override + public Object apply() { + return (float) address2ShardId.size(); + } + }); + } + + private static Seq<String> gaugeName(String name) { + return scala.collection.JavaConversions.asScalaBuffer(Arrays.asList(name)).toList(); + } + + @Override + public void startService() { + super.startService(); + this.hashedWheelTimer.start(); + } + + @Override + public void stopService() { + this.hashedWheelTimer.stop(); + super.stopService(); + } + + @Override + public Set<SocketAddress> getHosts() { + synchronized (shardId2Address) { + return ImmutableSet.copyOf(address2ShardId.keySet()); + } + } + + @Override + public SocketAddress getHost(String key, RoutingContext rContext) + throws NoBrokersAvailableException { + SocketAddress host = circle.get(key, rContext); + if (null != host) { + return host; + } + throw new NoBrokersAvailableException("No host found for " + key + ", routing context : " + rContext); + } + + @Override + public void removeHost(SocketAddress host, Throwable reason) { + removeHostInternal(host, Optional.of(reason)); + } + + private void removeHostInternal(SocketAddress host, Optional<Throwable> reason) { + synchronized (shardId2Address) { + Integer shardId = address2ShardId.remove(host); + if (null != shardId) { + SocketAddress curHost = shardId2Address.get(shardId); + if (null != curHost && curHost.equals(host)) { + shardId2Address.remove(shardId); + } + circle.remove(shardId, host); + if (reason.isPresent()) { + if (reason.get() instanceof ChannelException) { + logger.info("Shard {} ({}) left due to ChannelException, black it out for {} seconds" + + " (message = {})", + new Object[] { shardId, host, blackoutSeconds, reason.get().toString() }); + BlackoutHost blackoutHost = new BlackoutHost(shardId, host); + hashedWheelTimer.newTimeout(blackoutHost, blackoutSeconds, TimeUnit.SECONDS); + } else { + logger.info("Shard {} ({}) left due to exception {}", + new Object[] { shardId, host, reason.get().toString() }); + } + } else { + logger.info("Shard {} ({}) left after server set change", + shardId, host); + } + } else if (reason.isPresent()) { + logger.info("Node {} left due to exception {}", host, reason.get().toString()); + } else { + logger.info("Node {} left after server set change", host); + } + } + } + + /** + * The caller should synchronize on <i>shardId2Address</i>. + * @param shardId + * Shard id of new host joined. + * @param newHost + * New host joined. + * @param removedList + * Old hosts to remove + */ + private void join(int shardId, SocketAddress newHost, Set<SocketAddress> removedList) { + SocketAddress oldHost = shardId2Address.put(shardId, newHost); + if (null != oldHost) { + // remove the old host only when a new shard is kicked in to replace it. + address2ShardId.remove(oldHost); + circle.remove(shardId, oldHost); + removedList.add(oldHost); + logger.info("Shard {} ({}) left permanently.", shardId, oldHost); + } + address2ShardId.put(newHost, shardId); + circle.add(shardId, newHost); + logger.info("Shard {} ({}) joined to replace ({}).", + new Object[] { shardId, newHost, oldHost }); + } + + @Override + protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serviceInstances) { + Set<SocketAddress> joinedList = new HashSet<SocketAddress>(); + Set<SocketAddress> removedList = new HashSet<SocketAddress>(); + + Map<Integer, SocketAddress> newMap = new HashMap<Integer, SocketAddress>(); + synchronized (shardId2Address) { + for (DLSocketAddress serviceInstance : serviceInstances) { + if (serviceInstance.getShard() >= 0) { + newMap.put(serviceInstance.getShard(), serviceInstance.getSocketAddress()); + } else { + Integer shard = address2ShardId.get(serviceInstance.getSocketAddress()); + if (null == shard) { + // Assign a random negative shardId + int shardId; + do { + shardId = Math.min(-1 , (int) (Math.random() * Integer.MIN_VALUE)); + } while (null != shardId2Address.get(shardId)); + shard = shardId; + } + newMap.put(shard, serviceInstance.getSocketAddress()); + } + } + } + + Map<Integer, SocketAddress> left; + synchronized (shardId2Address) { + MapDifference<Integer, SocketAddress> difference = + Maps.difference(shardId2Address, newMap); + left = difference.entriesOnlyOnLeft(); + for (Map.Entry<Integer, SocketAddress> shardEntry : left.entrySet()) { + int shard = shardEntry.getKey(); + if (shard >= 0) { + SocketAddress host = shardId2Address.get(shard); + if (null != host) { + // we don't remove those hosts that just disappered on serverset proactively, + // since it might be just because serverset become flaky + // address2ShardId.remove(host); + // circle.remove(shard, host); + logger.info("Shard {} ({}) left temporarily.", shard, host); + } + } else { + // shard id is negative - they are resolved from finagle name, which instances don't have shard id + // in this case, if they are removed from serverset, we removed them directly + SocketAddress host = shardEntry.getValue(); + if (null != host) { + removeHostInternal(host, Optional.<Throwable>absent()); + removedList.add(host); + } + } + } + // we need to find if any shards are replacing old shards + for (Map.Entry<Integer, SocketAddress> shard : newMap.entrySet()) { + SocketAddress oldHost = shardId2Address.get(shard.getKey()); + SocketAddress newHost = shard.getValue(); + if (!newHost.equals(oldHost)) { + join(shard.getKey(), newHost, removedList); + joinedList.add(newHost); + } + } + } + + for (SocketAddress addr : removedList) { + for (RoutingListener listener : listeners) { + listener.onServerLeft(addr); + } + } + + for (SocketAddress addr : joinedList) { + for (RoutingListener listener : listeners) { + listener.onServerJoin(addr); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java new file mode 100644 index 0000000..e51eb1e --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import com.google.common.collect.ImmutableSet; +import com.twitter.common.base.Command; +import com.twitter.common.base.Commands; +import com.twitter.common.zookeeper.Group; +import com.twitter.common.zookeeper.ServerSet; +import com.twitter.finagle.Addr; +import com.twitter.finagle.Address; +import com.twitter.finagle.Name; +import com.twitter.finagle.Resolver$; +import com.twitter.thrift.Endpoint; +import com.twitter.thrift.ServiceInstance; +import com.twitter.thrift.Status; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +/** + * Finagle Name based {@link ServerSet} implementation. + */ +class NameServerSet implements ServerSet { + + private static final Logger logger = LoggerFactory.getLogger(NameServerSet.class); + + private volatile Set<HostChangeMonitor<ServiceInstance>> watchers = + new HashSet<HostChangeMonitor<ServiceInstance>>(); + private volatile ImmutableSet<ServiceInstance> hostSet = ImmutableSet.of(); + private AtomicBoolean resolutionPending = new AtomicBoolean(true); + + public NameServerSet(String nameStr) { + Name name; + try { + name = Resolver$.MODULE$.eval(nameStr); + } catch (Exception exc) { + logger.error("Exception in Resolver.eval for name {}", nameStr, exc); + // Since this is called from various places that dont handle specific exceptions, + // we have no option than to throw a runtime exception to halt the control flow + // This should only happen in case of incorrect configuration. Having a log message + // would help identify the problem during tests + throw new RuntimeException(exc); + } + initialize(name); + } + + public NameServerSet(Name name) { + initialize(name); + } + + private void initialize(Name name) { + if (name instanceof TestName) { + ((TestName) name).changes(new AbstractFunction1<Addr, BoxedUnit>() { + @Override + public BoxedUnit apply(Addr varAddr) { + return NameServerSet.this.respondToChanges(varAddr); + } + }); + } else if (name instanceof Name.Bound) { + ((Name.Bound) name).addr().changes().respond(new AbstractFunction1<Addr, BoxedUnit>() { + @Override + public BoxedUnit apply(Addr varAddr) { + return NameServerSet.this.respondToChanges(varAddr); + } + }); + } else { + logger.error("NameServerSet only supports Name.Bound. While the resolved name {} was {}", + name, name.getClass()); + throw new UnsupportedOperationException("NameServerSet only supports Name.Bound"); + } + } + + private ServiceInstance endpointAddressToServiceInstance(Address endpointAddress) { + if (endpointAddress instanceof Address.Inet) { + InetSocketAddress inetSocketAddress = ((Address.Inet) endpointAddress).addr(); + Endpoint endpoint = new Endpoint(inetSocketAddress.getHostString(), inetSocketAddress.getPort()); + HashMap<String, Endpoint> map = new HashMap<String, Endpoint>(); + map.put("thrift", endpoint); + return new ServiceInstance( + endpoint, + map, + Status.ALIVE); + } else { + logger.error("We expect InetSocketAddress while the resolved address {} was {}", + endpointAddress, endpointAddress.getClass()); + throw new UnsupportedOperationException("invalid endpoint address: " + endpointAddress); + } + } + + + private BoxedUnit respondToChanges(Addr addr) { + ImmutableSet<ServiceInstance> oldHostSet = ImmutableSet.copyOf(hostSet); + + ImmutableSet<ServiceInstance> newHostSet = oldHostSet; + + if (addr instanceof Addr.Bound) { + scala.collection.immutable.Set<Address> endpointAddresses = ((Addr.Bound) addr).addrs(); + scala.collection.Iterator<Address> endpointAddressesIterator = endpointAddresses.toIterator(); + HashSet<ServiceInstance> serviceInstances = new HashSet<ServiceInstance>(); + while (endpointAddressesIterator.hasNext()) { + serviceInstances.add(endpointAddressToServiceInstance(endpointAddressesIterator.next())); + } + newHostSet = ImmutableSet.copyOf(serviceInstances); + + } else if (addr instanceof Addr.Failed) { + logger.error("Name resolution failed", ((Addr.Failed) addr).cause()); + newHostSet = ImmutableSet.of(); + } else if (addr.toString().equals("Pending")) { + logger.info("Name resolution pending"); + newHostSet = oldHostSet; + } else if (addr.toString().equals("Neg")) { + newHostSet = ImmutableSet.of(); + } else { + logger.error("Invalid Addr type: {}", addr.getClass().getName()); + throw new UnsupportedOperationException("Invalid Addr type:" + addr.getClass().getName()); + } + + // Reference comparison is valid as the sets are immutable + if (oldHostSet != newHostSet) { + logger.info("NameServerSet updated: {} -> {}", hostSetToString(oldHostSet), hostSetToString(newHostSet)); + resolutionPending.set(false); + hostSet = newHostSet; + synchronized (watchers) { + for (HostChangeMonitor<ServiceInstance> watcher: watchers) { + watcher.onChange(newHostSet); + } + } + + } + + return BoxedUnit.UNIT; + } + + + private String hostSetToString(ImmutableSet<ServiceInstance> hostSet) { + StringBuilder result = new StringBuilder(); + result.append("("); + for (ServiceInstance serviceInstance : hostSet) { + Endpoint endpoint = serviceInstance.getServiceEndpoint(); + result.append(String.format(" %s:%d", endpoint.getHost(), endpoint.getPort())); + } + result.append(" )"); + + return result.toString(); + } + + + /** + * Attempts to join a server set for this logical service group. + * + * @param endpoint the primary service endpoint + * @param additionalEndpoints and additional endpoints keyed by their logical name + * @param status the current service status + * @return an EndpointStatus object that allows the endpoint to adjust its status + * @throws Group.JoinException if there was a problem joining the server set + * @throws InterruptedException if interrupted while waiting to join the server set + * @deprecated The status field is deprecated. Please use {@link #join(java.net.InetSocketAddress, java.util.Map)} + */ + @Override + public EndpointStatus join(InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + Status status) + throws Group.JoinException, InterruptedException { + throw new UnsupportedOperationException("NameServerSet does not support join"); + } + + /** + * Attempts to join a server set for this logical service group. + * + * @param endpoint the primary service endpoint + * @param additionalEndpoints and additional endpoints keyed by their logical name + * @return an EndpointStatus object that allows the endpoint to adjust its status + * @throws Group.JoinException if there was a problem joining the server set + * @throws InterruptedException if interrupted while waiting to join the server set + */ + @Override + public EndpointStatus join(InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints) + throws Group.JoinException, InterruptedException { + throw new UnsupportedOperationException("NameServerSet does not support join"); + } + + /** + * Attempts to join a server set for this logical service group. + * + * @param endpoint the primary service endpoint + * @param additionalEndpoints and additional endpoints keyed by their logical name + * @param shardId Unique shard identifier for this member of the service. + * @return an EndpointStatus object that allows the endpoint to adjust its status + * @throws Group.JoinException if there was a problem joining the server set + * @throws InterruptedException if interrupted while waiting to join the server set + */ + @Override + public EndpointStatus join(InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + int shardId) + throws Group.JoinException, InterruptedException { + throw new UnsupportedOperationException("NameServerSet does not support join"); + } + + /** + * Registers a monitor to receive change notices for this server set as long as this jvm process + * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. + * The monitor will be notified if the membership set or parameters of existing members have + * changed. + * + * @param monitor the server set monitor to call back when the host set changes + * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set + * @deprecated Deprecated in favor of {@link #watch(com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor)} + */ + @Deprecated + @Override + public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + throw new UnsupportedOperationException("NameServerSet does not support monitor"); + } + + /** + * Registers a monitor to receive change notices for this server set as long as this jvm process + * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. + * The monitor will be notified if the membership set or parameters of existing members have + * changed. + * + * @param monitor the server set monitor to call back when the host set changes + * @return A command which, when executed, will stop monitoring the host set. + * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set + */ + @Override + public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + // First add the monitor to the watchers so that it does not miss any changes and invoke + // the onChange method + synchronized (watchers) { + watchers.add(monitor); + } + + if (resolutionPending.compareAndSet(false, false)) { + monitor.onChange(hostSet); + } + + return Commands.NOOP; // Return value is not used + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java new file mode 100644 index 0000000..d71cee3 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Sets; +import org.apache.distributedlog.client.resolver.RegionResolver; +import com.twitter.finagle.NoBrokersAvailableException; +import com.twitter.finagle.stats.NullStatsReceiver; +import com.twitter.finagle.stats.StatsReceiver; +import java.net.SocketAddress; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Chain multiple routing services. + */ +public class RegionsRoutingService implements RoutingService { + + private static final Logger logger = LoggerFactory.getLogger(RegionsRoutingService.class); + + /** + * Create a multiple regions routing services based on a list of region routing {@code services}. + * + * <p>It is deprecated. Please use {@link Builder} to build multiple regions routing service. + * + * @param regionResolver region resolver + * @param services a list of region routing services. + * @return multiple regions routing service + * @see Builder + */ + @Deprecated + public static RegionsRoutingService of(RegionResolver regionResolver, + RoutingService...services) { + return new RegionsRoutingService(regionResolver, services); + } + + /** + * Create a builder to build a multiple-regions routing service. + * + * @return builder to build a multiple-regions routing service. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder to build a multiple-regions routing service. + */ + public static class Builder implements RoutingService.Builder { + + private RegionResolver resolver; + private RoutingService.Builder[] routingServiceBuilders; + private StatsReceiver statsReceiver = NullStatsReceiver.get(); + + private Builder() {} + + public Builder routingServiceBuilders(RoutingService.Builder...builders) { + this.routingServiceBuilders = builders; + return this; + } + + public Builder resolver(RegionResolver regionResolver) { + this.resolver = regionResolver; + return this; + } + + @Override + public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) { + this.statsReceiver = statsReceiver; + return this; + } + + @Override + public RegionsRoutingService build() { + checkNotNull(routingServiceBuilders, "No routing service builder provided."); + checkNotNull(resolver, "No region resolver provided."); + checkNotNull(statsReceiver, "No stats receiver provided"); + RoutingService[] services = new RoutingService[routingServiceBuilders.length]; + for (int i = 0; i < services.length; i++) { + String statsScope; + if (0 == i) { + statsScope = "local"; + } else { + statsScope = "remote_" + i; + } + services[i] = routingServiceBuilders[i] + .statsReceiver(statsReceiver.scope(statsScope)) + .build(); + } + return new RegionsRoutingService(resolver, services); + } + } + + protected final RegionResolver regionResolver; + protected final RoutingService[] routingServices; + + private RegionsRoutingService(RegionResolver resolver, + RoutingService[] routingServices) { + this.regionResolver = resolver; + this.routingServices = routingServices; + } + + @Override + public Set<SocketAddress> getHosts() { + Set<SocketAddress> hosts = Sets.newHashSet(); + for (RoutingService rs : routingServices) { + hosts.addAll(rs.getHosts()); + } + return hosts; + } + + @Override + public void startService() { + for (RoutingService service : routingServices) { + service.startService(); + } + logger.info("Regions Routing Service Started"); + } + + @Override + public void stopService() { + for (RoutingService service : routingServices) { + service.stopService(); + } + logger.info("Regions Routing Service Stopped"); + } + + @Override + public RoutingService registerListener(RoutingListener listener) { + for (RoutingService service : routingServices) { + service.registerListener(listener); + } + return this; + } + + @Override + public RoutingService unregisterListener(RoutingListener listener) { + for (RoutingService service : routingServices) { + service.registerListener(listener); + } + return this; + } + + @Override + public SocketAddress getHost(String key, RoutingContext routingContext) + throws NoBrokersAvailableException { + for (RoutingService service : routingServices) { + try { + SocketAddress addr = service.getHost(key, routingContext); + if (routingContext.hasUnavailableRegions()) { + // current region is unavailable + String region = regionResolver.resolveRegion(addr); + if (routingContext.isUnavailableRegion(region)) { + continue; + } + } + if (!routingContext.isTriedHost(addr)) { + return addr; + } + } catch (NoBrokersAvailableException nbae) { + // if there isn't broker available in current service, try next service. + logger.debug("No brokers available in region {} : ", service, nbae); + } + } + throw new NoBrokersAvailableException("No host found for " + key + ", routing context : " + routingContext); + } + + @Override + public void removeHost(SocketAddress address, Throwable reason) { + for (RoutingService service : routingServices) { + service.removeHost(address, reason); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java new file mode 100644 index 0000000..ad73c17 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import org.apache.distributedlog.client.resolver.RegionResolver; +import org.apache.distributedlog.thrift.service.StatusCode; +import com.twitter.finagle.NoBrokersAvailableException; +import com.twitter.finagle.stats.StatsReceiver; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Routing Service provides mechanism how to route requests. + */ +public interface RoutingService { + + /** + * Builder to build routing service. + */ + interface Builder { + + /** + * Build routing service with stats receiver. + * + * @param statsReceiver + * stats receiver + * @return built routing service + */ + Builder statsReceiver(StatsReceiver statsReceiver); + + /** + * Build the routing service. + * + * @return built routing service + */ + RoutingService build(); + + } + + /** + * Listener for server changes on routing service. + */ + interface RoutingListener { + /** + * Trigger when server left. + * + * @param address left server. + */ + void onServerLeft(SocketAddress address); + + /** + * Trigger when server joint. + * + * @param address joint server. + */ + void onServerJoin(SocketAddress address); + } + + /** + * Routing Context of a request. + */ + class RoutingContext { + + public static RoutingContext of(RegionResolver resolver) { + return new RoutingContext(resolver); + } + + final RegionResolver regionResolver; + final Map<SocketAddress, StatusCode> triedHosts; + final Set<String> unavailableRegions; + + private RoutingContext(RegionResolver regionResolver) { + this.regionResolver = regionResolver; + this.triedHosts = new HashMap<SocketAddress, StatusCode>(); + this.unavailableRegions = new HashSet<String>(); + } + + @Override + public synchronized String toString() { + return "(tried hosts=" + triedHosts + ")"; + } + + /** + * Add tried host to routing context. + * + * @param socketAddress + * socket address of tried host. + * @param code + * status code returned from tried host. + * @return routing context. + */ + public synchronized RoutingContext addTriedHost(SocketAddress socketAddress, StatusCode code) { + this.triedHosts.put(socketAddress, code); + if (StatusCode.REGION_UNAVAILABLE == code) { + unavailableRegions.add(regionResolver.resolveRegion(socketAddress)); + } + return this; + } + + /** + * Is the host <i>address</i> already tried. + * + * @param address + * socket address to check + * @return true if the address is already tried, otherwise false. + */ + public synchronized boolean isTriedHost(SocketAddress address) { + return this.triedHosts.containsKey(address); + } + + /** + * Whether encountered unavailable regions. + * + * @return true if encountered unavailable regions, otherwise false. + */ + public synchronized boolean hasUnavailableRegions() { + return !unavailableRegions.isEmpty(); + } + + /** + * Whether the <i>region</i> is unavailable. + * + * @param region + * region + * @return true if the region is unavailable, otherwise false. + */ + public synchronized boolean isUnavailableRegion(String region) { + return unavailableRegions.contains(region); + } + + } + + /** + * Start routing service. + */ + void startService(); + + /** + * Stop routing service. + */ + void stopService(); + + /** + * Register routing listener. + * + * @param listener routing listener. + * @return routing service. + */ + RoutingService registerListener(RoutingListener listener); + + /** + * Unregister routing listener. + * + * @param listener routing listener. + * @return routing service. + */ + RoutingService unregisterListener(RoutingListener listener); + + /** + * Get all the hosts that available in routing service. + * + * @return all the hosts + */ + Set<SocketAddress> getHosts(); + + /** + * Get the host to route the request by <i>key</i>. + * + * @param key + * key to route the request. + * @param rContext + * routing context. + * @return host to route the request + * @throws NoBrokersAvailableException + */ + SocketAddress getHost(String key, RoutingContext rContext) + throws NoBrokersAvailableException; + + /** + * Remove the host <i>address</i> for a specific <i>reason</i>. + * + * @param address + * host address to remove + * @param reason + * reason to remove the host + */ + void removeHost(SocketAddress address, Throwable reason); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java new file mode 100644 index 0000000..4ac22ce --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import com.twitter.finagle.stats.StatsReceiver; + +class RoutingServiceProvider implements RoutingService.Builder { + + final RoutingService routingService; + + RoutingServiceProvider(RoutingService routingService) { + this.routingService = routingService; + } + + @Override + public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) { + return this; + } + + @Override + public RoutingService build() { + return routingService; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java new file mode 100644 index 0000000..8e8edd3 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import com.twitter.common.zookeeper.ServerSet; +import java.net.SocketAddress; + +/** + * Utils for routing services. + */ +public class RoutingUtils { + + private static final int NUM_CONSISTENT_HASH_REPLICAS = 997; + + /** + * Building routing service from <code>finagleNameStr</code>. + * + * @param finagleNameStr + * finagle name str of a service + * @return routing service builder + */ + public static RoutingService.Builder buildRoutingService(String finagleNameStr) { + if (!finagleNameStr.startsWith("serverset!") + && !finagleNameStr.startsWith("inet!") + && !finagleNameStr.startsWith("zk!")) { + // We only support serverset based names at the moment + throw new UnsupportedOperationException("Finagle Name format not supported for name: " + finagleNameStr); + } + return buildRoutingService(new NameServerSet(finagleNameStr), true); + } + + /** + * Building routing service from <code>serverSet</code>. + * + * @param serverSet + * server set of a service + * @return routing service builder + */ + public static RoutingService.Builder buildRoutingService(ServerSet serverSet) { + return buildRoutingService(serverSet, false); + } + + /** + * Building routing service from <code>address</code>. + * + * @param address + * host to route the requests + * @return routing service builder + */ + public static RoutingService.Builder buildRoutingService(SocketAddress address) { + return SingleHostRoutingService.newBuilder().address(address); + } + + /** + * Build routing service builder of a routing service <code>routingService</code>. + * + * @param routingService + * routing service to provide + * @return routing service builder + */ + public static RoutingService.Builder buildRoutingService(RoutingService routingService) { + return new RoutingServiceProvider(routingService); + } + + private static RoutingService.Builder buildRoutingService(ServerSet serverSet, + boolean resolveFromName) { + return ConsistentHashRoutingService.newBuilder() + .serverSet(serverSet) + .resolveFromName(resolveFromName) + .numReplicas(NUM_CONSISTENT_HASH_REPLICAS); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java new file mode 100644 index 0000000..4fe8141 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import org.apache.distributedlog.service.DLSocketAddress; +import com.twitter.finagle.NoBrokersAvailableException; +import com.twitter.finagle.stats.StatsReceiver; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Routing Service based on a given {@link com.twitter.common.zookeeper.ServerSet}. + */ +class ServerSetRoutingService extends Thread implements RoutingService { + + private static final Logger logger = LoggerFactory.getLogger(ServerSetRoutingService.class); + + static ServerSetRoutingServiceBuilder newServerSetRoutingServiceBuilder() { + return new ServerSetRoutingServiceBuilder(); + } + + /** + * Builder to build {@link com.twitter.common.zookeeper.ServerSet} based routing service. + */ + static class ServerSetRoutingServiceBuilder implements RoutingService.Builder { + + private ServerSetWatcher serverSetWatcher; + + private ServerSetRoutingServiceBuilder() {} + + public ServerSetRoutingServiceBuilder serverSetWatcher(ServerSetWatcher serverSetWatcher) { + this.serverSetWatcher = serverSetWatcher; + return this; + } + + @Override + public Builder statsReceiver(StatsReceiver statsReceiver) { + return this; + } + + @Override + public RoutingService build() { + checkNotNull(serverSetWatcher, "No serverset watcher provided."); + return new ServerSetRoutingService(this.serverSetWatcher); + } + } + + private static class HostComparator implements Comparator<SocketAddress> { + + private static final HostComparator INSTANCE = new HostComparator(); + + @Override + public int compare(SocketAddress o1, SocketAddress o2) { + return o1.toString().compareTo(o2.toString()); + } + } + + private final ServerSetWatcher serverSetWatcher; + + private final Set<SocketAddress> hostSet = new HashSet<SocketAddress>(); + private List<SocketAddress> hostList = new ArrayList<SocketAddress>(); + private final HashFunction hasher = Hashing.md5(); + + // Server Set Changes + private final AtomicReference<ImmutableSet<DLSocketAddress>> serverSetChange = + new AtomicReference<ImmutableSet<DLSocketAddress>>(null); + private final CountDownLatch changeLatch = new CountDownLatch(1); + + // Listeners + protected final CopyOnWriteArraySet<RoutingListener> listeners = + new CopyOnWriteArraySet<RoutingListener>(); + + ServerSetRoutingService(ServerSetWatcher serverSetWatcher) { + super("ServerSetRoutingService"); + this.serverSetWatcher = serverSetWatcher; + } + + @Override + public Set<SocketAddress> getHosts() { + synchronized (hostSet) { + return ImmutableSet.copyOf(hostSet); + } + } + + @Override + public void startService() { + start(); + try { + if (!changeLatch.await(1, TimeUnit.MINUTES)) { + logger.warn("No serverset change received in 1 minute."); + } + } catch (InterruptedException e) { + logger.warn("Interrupted waiting first serverset change : ", e); + } + logger.info("{} Routing Service Started.", getClass().getSimpleName()); + } + + @Override + public void stopService() { + Thread.currentThread().interrupt(); + try { + join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Interrupted on waiting serverset routing service to finish : ", e); + } + logger.info("{} Routing Service Stopped.", getClass().getSimpleName()); + } + + @Override + public RoutingService registerListener(RoutingListener listener) { + listeners.add(listener); + return this; + } + + @Override + public RoutingService unregisterListener(RoutingListener listener) { + listeners.remove(listener); + return this; + } + + @Override + public SocketAddress getHost(String key, RoutingContext rContext) + throws NoBrokersAvailableException { + SocketAddress address = null; + synchronized (hostSet) { + if (0 != hostList.size()) { + int hashCode = hasher.hashUnencodedChars(key).asInt(); + int hostId = signSafeMod(hashCode, hostList.size()); + address = hostList.get(hostId); + if (rContext.isTriedHost(address)) { + ArrayList<SocketAddress> newList = new ArrayList<SocketAddress>(hostList); + newList.remove(hostId); + // pickup a new host by rehashing it. + hostId = signSafeMod(hashCode, newList.size()); + address = newList.get(hostId); + int i = hostId; + while (rContext.isTriedHost(address)) { + i = (i + 1) % newList.size(); + if (i == hostId) { + address = null; + break; + } + address = newList.get(i); + } + } + } + } + if (null == address) { + throw new NoBrokersAvailableException("No host is available."); + } + return address; + } + + @Override + public void removeHost(SocketAddress host, Throwable reason) { + synchronized (hostSet) { + if (hostSet.remove(host)) { + logger.info("Node {} left due to : ", host, reason); + } + hostList = new ArrayList<SocketAddress>(hostSet); + Collections.sort(hostList, HostComparator.INSTANCE); + logger.info("Host list becomes : {}.", hostList); + } + } + + @Override + public void run() { + try { + serverSetWatcher.watch(new ServerSetWatcher.ServerSetMonitor() { + @Override + public void onChange(ImmutableSet<DLSocketAddress> serviceInstances) { + ImmutableSet<DLSocketAddress> lastValue = serverSetChange.getAndSet(serviceInstances); + if (null == lastValue) { + ImmutableSet<DLSocketAddress> mostRecentValue; + do { + mostRecentValue = serverSetChange.get(); + performServerSetChange(mostRecentValue); + changeLatch.countDown(); + } while (!serverSetChange.compareAndSet(mostRecentValue, null)); + } + } + }); + } catch (Exception e) { + logger.error("Fail to monitor server set : ", e); + Runtime.getRuntime().exit(-1); + } + } + + protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serverSet) { + Set<SocketAddress> newSet = new HashSet<SocketAddress>(); + for (DLSocketAddress serviceInstance : serverSet) { + newSet.add(serviceInstance.getSocketAddress()); + } + + Set<SocketAddress> removed; + Set<SocketAddress> added; + synchronized (hostSet) { + removed = Sets.difference(hostSet, newSet).immutableCopy(); + added = Sets.difference(newSet, hostSet).immutableCopy(); + for (SocketAddress node: removed) { + if (hostSet.remove(node)) { + logger.info("Node {} left.", node); + } + } + for (SocketAddress node: added) { + if (hostSet.add(node)) { + logger.info("Node {} joined.", node); + } + } + } + + for (SocketAddress addr : removed) { + for (RoutingListener listener : listeners) { + listener.onServerLeft(addr); + } + } + + for (SocketAddress addr : added) { + for (RoutingListener listener : listeners) { + listener.onServerJoin(addr); + } + } + + synchronized (hostSet) { + hostList = new ArrayList<SocketAddress>(hostSet); + Collections.sort(hostList, HostComparator.INSTANCE); + logger.info("Host list becomes : {}.", hostList); + } + + } + + static int signSafeMod(long dividend, int divisor) { + int mod = (int) (dividend % divisor); + + if (mod < 0) { + mod += divisor; + } + + return mod; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java new file mode 100644 index 0000000..77b7beb --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import com.google.common.collect.ImmutableSet; +import org.apache.distributedlog.service.DLSocketAddress; + +/** + * Watch on server set changes. + */ +public interface ServerSetWatcher { + + /** + * Exception thrown when failed to monitor serverset. + */ + class MonitorException extends Exception { + + private static final long serialVersionUID = 392751505154339548L; + + public MonitorException(String msg) { + super(msg); + } + + public MonitorException(String msg, Throwable cause) { + super(msg, cause); + } + } + + /** + * An interface to an object that is interested in receiving notification whenever the host set changes. + */ + interface ServerSetMonitor { + + /** + * Called when either the available set of services changes. + * + * <p>It happens either when a service dies or a new INSTANCE comes on-line or + * when an existing service advertises a status or health change. + * + * @param hostSet the current set of available ServiceInstances + */ + void onChange(ImmutableSet<DLSocketAddress> hostSet); + } + + /** + * Registers a monitor to receive change notices for this server set as long as this jvm process is alive. + * + * <p>Blocks until the initial server set can be gathered and delivered to the monitor. + * The monitor will be notified if the membership set or parameters of existing members have + * changed. + * + * @param monitor the server set monitor to call back when the host set changes + * @throws MonitorException if there is a problem monitoring the host set + */ + void watch(final ServerSetMonitor monitor) throws MonitorException; +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java new file mode 100644 index 0000000..753a1af --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Sets; +import com.twitter.finagle.NoBrokersAvailableException; +import com.twitter.finagle.stats.StatsReceiver; +import java.net.SocketAddress; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * Single Host Routing Service. + */ +public class SingleHostRoutingService implements RoutingService { + + public static SingleHostRoutingService of(SocketAddress address) { + return new SingleHostRoutingService(address); + } + + /** + * Builder to build single host based routing service. + * + * @return builder to build single host based routing service. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder to build single host based routing service. + */ + public static class Builder implements RoutingService.Builder { + + private SocketAddress address; + + private Builder() {} + + public Builder address(SocketAddress address) { + this.address = address; + return this; + } + + @Override + public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) { + return this; + } + + @Override + public RoutingService build() { + checkNotNull(address, "Host is null"); + return new SingleHostRoutingService(address); + } + } + + private SocketAddress address; + private final CopyOnWriteArraySet<RoutingListener> listeners = + new CopyOnWriteArraySet<RoutingListener>(); + + SingleHostRoutingService(SocketAddress address) { + this.address = address; + } + + public void setAddress(SocketAddress address) { + this.address = address; + } + + @Override + public Set<SocketAddress> getHosts() { + return Sets.newHashSet(address); + } + + @Override + public void startService() { + // no-op + for (RoutingListener listener : listeners) { + listener.onServerJoin(address); + } + } + + @Override + public void stopService() { + // no-op + } + + @Override + public RoutingService registerListener(RoutingListener listener) { + listeners.add(listener); + return this; + } + + @Override + public RoutingService unregisterListener(RoutingListener listener) { + listeners.remove(listener); + return null; + } + + @Override + public SocketAddress getHost(String key, RoutingContext rContext) + throws NoBrokersAvailableException { + if (rContext.isTriedHost(address)) { + throw new NoBrokersAvailableException("No hosts is available : routing context = " + rContext); + } + return address; + } + + @Override + public void removeHost(SocketAddress address, Throwable reason) { + // no-op + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java new file mode 100644 index 0000000..2fc8de0 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import com.twitter.finagle.Addr; +import com.twitter.finagle.Address; +import com.twitter.finagle.Addrs; +import com.twitter.finagle.Name; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +/** + * A {@link Name} implementation for testing purpose. + */ +public class TestName implements Name { + + private static final Logger LOG = LoggerFactory.getLogger(TestName.class); + + private AbstractFunction1<Addr, BoxedUnit> callback = null; + + public void changes(AbstractFunction1<Addr, BoxedUnit> callback) { + this.callback = callback; + } + + public void changeAddrs(List<Address> addresses) { + if (null != callback) { + LOG.info("Sending a callback {}", addresses); + callback.apply(Addrs.newBoundAddr(addresses)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java new file mode 100644 index 0000000..1ff7c93 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.twitter.common.net.pool.DynamicHostSet; +import com.twitter.common.zookeeper.ServerSet; +import org.apache.distributedlog.service.DLSocketAddress; +import com.twitter.thrift.Endpoint; +import com.twitter.thrift.ServiceInstance; +import java.net.InetSocketAddress; +import java.util.Set; + +/** + * Twitter {@link ServerSet} based watcher. + */ +public class TwitterServerSetWatcher implements ServerSetWatcher { + + private final ServerSet serverSet; + private final boolean resolvedFromName; + + /** + * Construct a {@link ServerSet} based watcher. + * + * @param serverSet server set. + * @param resolvedFromName whether to resolve hosts from {@link com.twitter.finagle.Name}. + */ + public TwitterServerSetWatcher(ServerSet serverSet, + boolean resolvedFromName) { + this.serverSet = serverSet; + this.resolvedFromName = resolvedFromName; + } + + /** + * Registers a monitor to receive change notices for this server set as long as this jvm process is alive. + * + * <p>Blocks until the initial server set can be gathered and delivered to the monitor. + * The monitor will be notified if the membership set or parameters of existing members have + * changed. + * + * @param monitor the server set monitor to call back when the host set changes + * @throws MonitorException if there is a problem monitoring the host set + */ + public void watch(final ServerSetMonitor monitor) + throws MonitorException { + try { + serverSet.watch(new DynamicHostSet.HostChangeMonitor<ServiceInstance>() { + @Override + public void onChange(ImmutableSet<ServiceInstance> serviceInstances) { + Set<DLSocketAddress> dlServers = Sets.newHashSet(); + for (ServiceInstance serviceInstance : serviceInstances) { + Endpoint endpoint = serviceInstance.getAdditionalEndpoints().get("thrift"); + InetSocketAddress inetAddr = + new InetSocketAddress(endpoint.getHost(), endpoint.getPort()); + int shardId = resolvedFromName ? -1 : serviceInstance.getShard(); + DLSocketAddress address = new DLSocketAddress(shardId, inetAddr); + dlServers.add(address); + } + monitor.onChange(ImmutableSet.copyOf(dlServers)); + } + }); + } catch (DynamicHostSet.MonitorException me) { + throw new MonitorException("Failed to monitor server set : ", me); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java new file mode 100644 index 0000000..352d755 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Routing Mechanisms to route the traffic to the owner of streams. + */ +package org.apache.distributedlog.client.routing; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java new file mode 100644 index 0000000..93cdf7a --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.serverset; + +import com.google.common.collect.ImmutableList; +import com.google.common.net.HostAndPort; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.zookeeper.ServerSet; +import com.twitter.common.zookeeper.ServerSets; +import com.twitter.common.zookeeper.ZooKeeperClient; +import java.net.InetSocketAddress; +import java.net.URI; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.ZooDefs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A wrapper over zookeeper client and its server set. + */ +public class DLZkServerSet { + + private static final Logger logger = LoggerFactory.getLogger(DLZkServerSet.class); + + static final String ZNODE_WRITE_PROXY = ".write_proxy"; + + private static String getZKServersFromDLUri(URI uri) { + return uri.getAuthority().replace(";", ","); + } + + private static Iterable<InetSocketAddress> getZkAddresses(URI uri) { + String zkServers = getZKServersFromDLUri(uri); + String[] zkServerList = StringUtils.split(zkServers, ','); + ImmutableList.Builder<InetSocketAddress> builder = ImmutableList.builder(); + for (String zkServer : zkServerList) { + HostAndPort hostAndPort = HostAndPort.fromString(zkServer).withDefaultPort(2181); + builder.add(InetSocketAddress.createUnresolved( + hostAndPort.getHostText(), + hostAndPort.getPort())); + } + return builder.build(); + } + + public static DLZkServerSet of(URI uri, + int zkSessionTimeoutMs) { + // Create zookeeper and server set + String zkPath = uri.getPath() + "/" + ZNODE_WRITE_PROXY; + Iterable<InetSocketAddress> zkAddresses = getZkAddresses(uri); + ZooKeeperClient zkClient = + new ZooKeeperClient(Amount.of(zkSessionTimeoutMs, Time.MILLISECONDS), zkAddresses); + ServerSet serverSet = ServerSets.create(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, zkPath); + return new DLZkServerSet(zkClient, serverSet); + } + + private final ZooKeeperClient zkClient; + private final ServerSet zkServerSet; + + public DLZkServerSet(ZooKeeperClient zkClient, + ServerSet zkServerSet) { + this.zkClient = zkClient; + this.zkServerSet = zkServerSet; + } + + public ZooKeeperClient getZkClient() { + return zkClient; + } + + public ServerSet getServerSet() { + return zkServerSet; + } + + public void close() { + zkClient.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java new file mode 100644 index 0000000..38a7544 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utils related to server set. + */ +package org.apache.distributedlog.client.serverset;