http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetRoutingService.java deleted file mode 100644 index 19ccfc4..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetRoutingService.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.routing; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; -import com.twitter.distributedlog.service.DLSocketAddress; -import com.twitter.finagle.NoBrokersAvailableException; -import com.twitter.finagle.stats.StatsReceiver; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Routing Service based on a given {@link com.twitter.common.zookeeper.ServerSet}. - */ -class ServerSetRoutingService extends Thread implements RoutingService { - - private static final Logger logger = LoggerFactory.getLogger(ServerSetRoutingService.class); - - static ServerSetRoutingServiceBuilder newServerSetRoutingServiceBuilder() { - return new ServerSetRoutingServiceBuilder(); - } - - /** - * Builder to build {@link com.twitter.common.zookeeper.ServerSet} based routing service. - */ - static class ServerSetRoutingServiceBuilder implements RoutingService.Builder { - - private ServerSetWatcher serverSetWatcher; - - private ServerSetRoutingServiceBuilder() {} - - public ServerSetRoutingServiceBuilder serverSetWatcher(ServerSetWatcher serverSetWatcher) { - this.serverSetWatcher = serverSetWatcher; - return this; - } - - @Override - public Builder statsReceiver(StatsReceiver statsReceiver) { - return this; - } - - @Override - public RoutingService build() { - checkNotNull(serverSetWatcher, "No serverset watcher provided."); - return new ServerSetRoutingService(this.serverSetWatcher); - } - } - - private static class HostComparator implements Comparator<SocketAddress> { - - private static final HostComparator INSTANCE = new HostComparator(); - - @Override - public int compare(SocketAddress o1, SocketAddress o2) { - return o1.toString().compareTo(o2.toString()); - } - } - - private final ServerSetWatcher serverSetWatcher; - - private final Set<SocketAddress> hostSet = new HashSet<SocketAddress>(); - private List<SocketAddress> hostList = new ArrayList<SocketAddress>(); - private final HashFunction hasher = Hashing.md5(); - - // Server Set Changes - private final AtomicReference<ImmutableSet<DLSocketAddress>> serverSetChange = - new AtomicReference<ImmutableSet<DLSocketAddress>>(null); - private final CountDownLatch changeLatch = new CountDownLatch(1); - - // Listeners - protected final CopyOnWriteArraySet<RoutingListener> listeners = - new CopyOnWriteArraySet<RoutingListener>(); - - ServerSetRoutingService(ServerSetWatcher serverSetWatcher) { - super("ServerSetRoutingService"); - this.serverSetWatcher = serverSetWatcher; - } - - @Override - public Set<SocketAddress> getHosts() { - synchronized (hostSet) { - return ImmutableSet.copyOf(hostSet); - } - } - - @Override - public void startService() { - start(); - try { - if (!changeLatch.await(1, TimeUnit.MINUTES)) { - logger.warn("No serverset change received in 1 minute."); - } - } catch (InterruptedException e) { - logger.warn("Interrupted waiting first serverset change : ", e); - } - logger.info("{} Routing Service Started.", getClass().getSimpleName()); - } - - @Override - public void stopService() { - Thread.currentThread().interrupt(); - try { - join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.warn("Interrupted on waiting serverset routing service to finish : ", e); - } - logger.info("{} Routing Service Stopped.", getClass().getSimpleName()); - } - - @Override - public RoutingService registerListener(RoutingListener listener) { - listeners.add(listener); - return this; - } - - @Override - public RoutingService unregisterListener(RoutingListener listener) { - listeners.remove(listener); - return this; - } - - @Override - public SocketAddress getHost(String key, RoutingContext rContext) - throws NoBrokersAvailableException { - SocketAddress address = null; - synchronized (hostSet) { - if (0 != hostList.size()) { - int hashCode = hasher.hashUnencodedChars(key).asInt(); - int hostId = signSafeMod(hashCode, hostList.size()); - address = hostList.get(hostId); - if (rContext.isTriedHost(address)) { - ArrayList<SocketAddress> newList = new ArrayList<SocketAddress>(hostList); - newList.remove(hostId); - // pickup a new host by rehashing it. - hostId = signSafeMod(hashCode, newList.size()); - address = newList.get(hostId); - int i = hostId; - while (rContext.isTriedHost(address)) { - i = (i + 1) % newList.size(); - if (i == hostId) { - address = null; - break; - } - address = newList.get(i); - } - } - } - } - if (null == address) { - throw new NoBrokersAvailableException("No host is available."); - } - return address; - } - - @Override - public void removeHost(SocketAddress host, Throwable reason) { - synchronized (hostSet) { - if (hostSet.remove(host)) { - logger.info("Node {} left due to : ", host, reason); - } - hostList = new ArrayList<SocketAddress>(hostSet); - Collections.sort(hostList, HostComparator.INSTANCE); - logger.info("Host list becomes : {}.", hostList); - } - } - - @Override - public void run() { - try { - serverSetWatcher.watch(new ServerSetWatcher.ServerSetMonitor() { - @Override - public void onChange(ImmutableSet<DLSocketAddress> serviceInstances) { - ImmutableSet<DLSocketAddress> lastValue = serverSetChange.getAndSet(serviceInstances); - if (null == lastValue) { - ImmutableSet<DLSocketAddress> mostRecentValue; - do { - mostRecentValue = serverSetChange.get(); - performServerSetChange(mostRecentValue); - changeLatch.countDown(); - } while (!serverSetChange.compareAndSet(mostRecentValue, null)); - } - } - }); - } catch (Exception e) { - logger.error("Fail to monitor server set : ", e); - Runtime.getRuntime().exit(-1); - } - } - - protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serverSet) { - Set<SocketAddress> newSet = new HashSet<SocketAddress>(); - for (DLSocketAddress serviceInstance : serverSet) { - newSet.add(serviceInstance.getSocketAddress()); - } - - Set<SocketAddress> removed; - Set<SocketAddress> added; - synchronized (hostSet) { - removed = Sets.difference(hostSet, newSet).immutableCopy(); - added = Sets.difference(newSet, hostSet).immutableCopy(); - for (SocketAddress node: removed) { - if (hostSet.remove(node)) { - logger.info("Node {} left.", node); - } - } - for (SocketAddress node: added) { - if (hostSet.add(node)) { - logger.info("Node {} joined.", node); - } - } - } - - for (SocketAddress addr : removed) { - for (RoutingListener listener : listeners) { - listener.onServerLeft(addr); - } - } - - for (SocketAddress addr : added) { - for (RoutingListener listener : listeners) { - listener.onServerJoin(addr); - } - } - - synchronized (hostSet) { - hostList = new ArrayList<SocketAddress>(hostSet); - Collections.sort(hostList, HostComparator.INSTANCE); - logger.info("Host list becomes : {}.", hostList); - } - - } - - static int signSafeMod(long dividend, int divisor) { - int mod = (int) (dividend % divisor); - - if (mod < 0) { - mod += divisor; - } - - return mod; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetWatcher.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetWatcher.java deleted file mode 100644 index 1eccb63..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ServerSetWatcher.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.routing; - -import com.google.common.collect.ImmutableSet; -import com.twitter.distributedlog.service.DLSocketAddress; - -/** - * Watch on server set changes. - */ -public interface ServerSetWatcher { - - /** - * Exception thrown when failed to monitor serverset. - */ - class MonitorException extends Exception { - - private static final long serialVersionUID = 392751505154339548L; - - public MonitorException(String msg) { - super(msg); - } - - public MonitorException(String msg, Throwable cause) { - super(msg, cause); - } - } - - /** - * An interface to an object that is interested in receiving notification whenever the host set changes. - */ - interface ServerSetMonitor { - - /** - * Called when either the available set of services changes. - * - * <p>It happens either when a service dies or a new INSTANCE comes on-line or - * when an existing service advertises a status or health change. - * - * @param hostSet the current set of available ServiceInstances - */ - void onChange(ImmutableSet<DLSocketAddress> hostSet); - } - - /** - * Registers a monitor to receive change notices for this server set as long as this jvm process is alive. - * - * <p>Blocks until the initial server set can be gathered and delivered to the monitor. - * The monitor will be notified if the membership set or parameters of existing members have - * changed. - * - * @param monitor the server set monitor to call back when the host set changes - * @throws MonitorException if there is a problem monitoring the host set - */ - void watch(final ServerSetMonitor monitor) throws MonitorException; -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java deleted file mode 100644 index e526868..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.routing; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Sets; -import com.twitter.finagle.NoBrokersAvailableException; -import com.twitter.finagle.stats.StatsReceiver; -import java.net.SocketAddress; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; - -/** - * Single Host Routing Service. - */ -public class SingleHostRoutingService implements RoutingService { - - public static SingleHostRoutingService of(SocketAddress address) { - return new SingleHostRoutingService(address); - } - - /** - * Builder to build single host based routing service. - * - * @return builder to build single host based routing service. - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder to build single host based routing service. - */ - public static class Builder implements RoutingService.Builder { - - private SocketAddress address; - - private Builder() {} - - public Builder address(SocketAddress address) { - this.address = address; - return this; - } - - @Override - public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) { - return this; - } - - @Override - public RoutingService build() { - checkNotNull(address, "Host is null"); - return new SingleHostRoutingService(address); - } - } - - private SocketAddress address; - private final CopyOnWriteArraySet<RoutingListener> listeners = - new CopyOnWriteArraySet<RoutingListener>(); - - SingleHostRoutingService(SocketAddress address) { - this.address = address; - } - - public void setAddress(SocketAddress address) { - this.address = address; - } - - @Override - public Set<SocketAddress> getHosts() { - return Sets.newHashSet(address); - } - - @Override - public void startService() { - // no-op - for (RoutingListener listener : listeners) { - listener.onServerJoin(address); - } - } - - @Override - public void stopService() { - // no-op - } - - @Override - public RoutingService registerListener(RoutingListener listener) { - listeners.add(listener); - return this; - } - - @Override - public RoutingService unregisterListener(RoutingListener listener) { - listeners.remove(listener); - return null; - } - - @Override - public SocketAddress getHost(String key, RoutingContext rContext) - throws NoBrokersAvailableException { - if (rContext.isTriedHost(address)) { - throw new NoBrokersAvailableException("No hosts is available : routing context = " + rContext); - } - return address; - } - - @Override - public void removeHost(SocketAddress address, Throwable reason) { - // no-op - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TestName.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TestName.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TestName.java deleted file mode 100644 index 8101075..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TestName.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.routing; - -import com.twitter.finagle.Addr; -import com.twitter.finagle.Address; -import com.twitter.finagle.Addrs; -import com.twitter.finagle.Name; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -/** - * A {@link Name} implementation for testing purpose. - */ -public class TestName implements Name { - - private static final Logger LOG = LoggerFactory.getLogger(TestName.class); - - private AbstractFunction1<Addr, BoxedUnit> callback = null; - - public void changes(AbstractFunction1<Addr, BoxedUnit> callback) { - this.callback = callback; - } - - public void changeAddrs(List<Address> addresses) { - if (null != callback) { - LOG.info("Sending a callback {}", addresses); - callback.apply(Addrs.newBoundAddr(addresses)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TwitterServerSetWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TwitterServerSetWatcher.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TwitterServerSetWatcher.java deleted file mode 100644 index cffb9b9..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/TwitterServerSetWatcher.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.routing; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.twitter.common.net.pool.DynamicHostSet; -import com.twitter.common.zookeeper.ServerSet; -import com.twitter.distributedlog.service.DLSocketAddress; -import com.twitter.thrift.Endpoint; -import com.twitter.thrift.ServiceInstance; -import java.net.InetSocketAddress; -import java.util.Set; - -/** - * Twitter {@link ServerSet} based watcher. - */ -public class TwitterServerSetWatcher implements ServerSetWatcher { - - private final ServerSet serverSet; - private final boolean resolvedFromName; - - /** - * Construct a {@link ServerSet} based watcher. - * - * @param serverSet server set. - * @param resolvedFromName whether to resolve hosts from {@link com.twitter.finagle.Name}. - */ - public TwitterServerSetWatcher(ServerSet serverSet, - boolean resolvedFromName) { - this.serverSet = serverSet; - this.resolvedFromName = resolvedFromName; - } - - /** - * Registers a monitor to receive change notices for this server set as long as this jvm process is alive. - * - * <p>Blocks until the initial server set can be gathered and delivered to the monitor. - * The monitor will be notified if the membership set or parameters of existing members have - * changed. - * - * @param monitor the server set monitor to call back when the host set changes - * @throws MonitorException if there is a problem monitoring the host set - */ - public void watch(final ServerSetMonitor monitor) - throws MonitorException { - try { - serverSet.watch(new DynamicHostSet.HostChangeMonitor<ServiceInstance>() { - @Override - public void onChange(ImmutableSet<ServiceInstance> serviceInstances) { - Set<DLSocketAddress> dlServers = Sets.newHashSet(); - for (ServiceInstance serviceInstance : serviceInstances) { - Endpoint endpoint = serviceInstance.getAdditionalEndpoints().get("thrift"); - InetSocketAddress inetAddr = - new InetSocketAddress(endpoint.getHost(), endpoint.getPort()); - int shardId = resolvedFromName ? -1 : serviceInstance.getShard(); - DLSocketAddress address = new DLSocketAddress(shardId, inetAddr); - dlServers.add(address); - } - monitor.onChange(ImmutableSet.copyOf(dlServers)); - } - }); - } catch (DynamicHostSet.MonitorException me) { - throw new MonitorException("Failed to monitor server set : ", me); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/package-info.java deleted file mode 100644 index a282b42..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Routing Mechanisms to route the traffic to the owner of streams. - */ -package com.twitter.distributedlog.client.routing; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java deleted file mode 100644 index 4ca3aa6..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/DLZkServerSet.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.serverset; - -import com.google.common.collect.ImmutableList; -import com.google.common.net.HostAndPort; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.zookeeper.ServerSet; -import com.twitter.common.zookeeper.ServerSets; -import com.twitter.common.zookeeper.ZooKeeperClient; -import java.net.InetSocketAddress; -import java.net.URI; -import org.apache.commons.lang.StringUtils; -import org.apache.zookeeper.ZooDefs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A wrapper over zookeeper client and its server set. - */ -public class DLZkServerSet { - - private static final Logger logger = LoggerFactory.getLogger(DLZkServerSet.class); - - static final String ZNODE_WRITE_PROXY = ".write_proxy"; - - private static String getZKServersFromDLUri(URI uri) { - return uri.getAuthority().replace(";", ","); - } - - private static Iterable<InetSocketAddress> getZkAddresses(URI uri) { - String zkServers = getZKServersFromDLUri(uri); - String[] zkServerList = StringUtils.split(zkServers, ','); - ImmutableList.Builder<InetSocketAddress> builder = ImmutableList.builder(); - for (String zkServer : zkServerList) { - HostAndPort hostAndPort = HostAndPort.fromString(zkServer).withDefaultPort(2181); - builder.add(InetSocketAddress.createUnresolved( - hostAndPort.getHostText(), - hostAndPort.getPort())); - } - return builder.build(); - } - - public static DLZkServerSet of(URI uri, - int zkSessionTimeoutMs) { - // Create zookeeper and server set - String zkPath = uri.getPath() + "/" + ZNODE_WRITE_PROXY; - Iterable<InetSocketAddress> zkAddresses = getZkAddresses(uri); - ZooKeeperClient zkClient = - new ZooKeeperClient(Amount.of(zkSessionTimeoutMs, Time.MILLISECONDS), zkAddresses); - ServerSet serverSet = ServerSets.create(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, zkPath); - return new DLZkServerSet(zkClient, serverSet); - } - - private final ZooKeeperClient zkClient; - private final ServerSet zkServerSet; - - public DLZkServerSet(ZooKeeperClient zkClient, - ServerSet zkServerSet) { - this.zkClient = zkClient; - this.zkServerSet = zkServerSet; - } - - public ZooKeeperClient getZkClient() { - return zkClient; - } - - public ServerSet getServerSet() { - return zkServerSet; - } - - public void close() { - zkClient.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/package-info.java deleted file mode 100644 index 49166ec..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/serverset/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Utils related to server set. - */ -package com.twitter.distributedlog.client.serverset; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java deleted file mode 100644 index 489fc00..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.speculative; - -import com.google.common.annotations.VisibleForTesting; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Default implementation of {@link SpeculativeRequestExecutionPolicy}. - */ -public class DefaultSpeculativeRequestExecutionPolicy implements SpeculativeRequestExecutionPolicy { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultSpeculativeRequestExecutionPolicy.class); - final int firstSpeculativeRequestTimeout; - final int maxSpeculativeRequestTimeout; - final float backoffMultiplier; - int nextSpeculativeRequestTimeout; - - public DefaultSpeculativeRequestExecutionPolicy(int firstSpeculativeRequestTimeout, - int maxSpeculativeRequestTimeout, - float backoffMultiplier) { - this.firstSpeculativeRequestTimeout = firstSpeculativeRequestTimeout; - this.maxSpeculativeRequestTimeout = maxSpeculativeRequestTimeout; - this.backoffMultiplier = backoffMultiplier; - this.nextSpeculativeRequestTimeout = firstSpeculativeRequestTimeout; - - if (backoffMultiplier <= 0) { - throw new IllegalArgumentException("Invalid value provided for backoffMultiplier"); - } - - // Prevent potential over flow - if (Math.round((double) maxSpeculativeRequestTimeout * (double) backoffMultiplier) > Integer.MAX_VALUE) { - throw new IllegalArgumentException("Invalid values for maxSpeculativeRequestTimeout and backoffMultiplier"); - } - } - - @VisibleForTesting - int getNextSpeculativeRequestTimeout() { - return nextSpeculativeRequestTimeout; - } - - /** - * Initialize the speculative request execution policy. - * - * @param scheduler The scheduler service to issue the speculative request - * @param requestExecutor The executor is used to issue the actual speculative requests - */ - @Override - public void initiateSpeculativeRequest(final ScheduledExecutorService scheduler, - final SpeculativeRequestExecutor requestExecutor) { - issueSpeculativeRequest(scheduler, requestExecutor); - } - - private void issueSpeculativeRequest(final ScheduledExecutorService scheduler, - final SpeculativeRequestExecutor requestExecutor) { - Future<Boolean> issueNextRequest = requestExecutor.issueSpeculativeRequest(); - issueNextRequest.addEventListener(new FutureEventListener<Boolean>() { - // we want this handler to run immediately after we push the big red button! - @Override - public void onSuccess(Boolean issueNextRequest) { - if (issueNextRequest) { - scheduleSpeculativeRequest(scheduler, requestExecutor, nextSpeculativeRequestTimeout); - nextSpeculativeRequestTimeout = Math.min(maxSpeculativeRequestTimeout, - (int) (nextSpeculativeRequestTimeout * backoffMultiplier)); - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Stopped issuing speculative requests for {}, " - + "speculativeReadTimeout = {}", requestExecutor, nextSpeculativeRequestTimeout); - } - } - } - - @Override - public void onFailure(Throwable thrown) { - LOG.warn("Failed to issue speculative request for {}, speculativeReadTimeout = {} : ", - new Object[] { requestExecutor, nextSpeculativeRequestTimeout, thrown }); - } - }); - } - - private void scheduleSpeculativeRequest(final ScheduledExecutorService scheduler, - final SpeculativeRequestExecutor requestExecutor, - final int speculativeRequestTimeout) { - try { - scheduler.schedule(new Runnable() { - @Override - public void run() { - issueSpeculativeRequest(scheduler, requestExecutor); - } - }, speculativeRequestTimeout, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException re) { - if (!scheduler.isShutdown()) { - LOG.warn("Failed to schedule speculative request for {}, speculativeReadTimeout = {} : ", - new Object[]{requestExecutor, speculativeRequestTimeout, re}); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java deleted file mode 100644 index 14615e9..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.speculative; - -import java.util.concurrent.ScheduledExecutorService; - -/** - * Speculative request execution policy. - */ -public interface SpeculativeRequestExecutionPolicy { - /** - * Initialize the speculative request execution policy and initiate requests. - * - * @param scheduler The scheduler service to issue the speculative request - * @param requestExecutor The executor is used to issue the actual speculative requests - */ - void initiateSpeculativeRequest(ScheduledExecutorService scheduler, - SpeculativeRequestExecutor requestExecutor); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/SpeculativeRequestExecutor.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/SpeculativeRequestExecutor.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/SpeculativeRequestExecutor.java deleted file mode 100644 index de1b0dd..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/SpeculativeRequestExecutor.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.speculative; - -import com.twitter.util.Future; - -/** - * Executor to execute speculative requests. - */ -public interface SpeculativeRequestExecutor { - - /** - * Issues a speculative request and indicates if more speculative requests should be issued. - * - * @return whether more speculative requests should be issued. - */ - Future<Boolean> issueSpeculativeRequest(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/package-info.java deleted file mode 100644 index b299266..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/speculative/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Speculative Mechanism. - */ -package com.twitter.distributedlog.client.speculative; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/ClientStats.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/ClientStats.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/ClientStats.java deleted file mode 100644 index f361892..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/ClientStats.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.stats; - -import com.twitter.distributedlog.client.resolver.RegionResolver; -import com.twitter.distributedlog.thrift.service.StatusCode; -import com.twitter.finagle.stats.StatsReceiver; -import java.net.SocketAddress; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Client Stats. - */ -public class ClientStats { - - // Region Resolver - private final RegionResolver regionResolver; - - // Stats - private final StatsReceiver statsReceiver; - private final ClientStatsLogger clientStatsLogger; - private final boolean enableRegionStats; - private final ConcurrentMap<String, ClientStatsLogger> regionClientStatsLoggers; - private final ConcurrentMap<String, OpStats> opStatsMap; - - public ClientStats(StatsReceiver statsReceiver, - boolean enableRegionStats, - RegionResolver regionResolver) { - this.statsReceiver = statsReceiver; - this.clientStatsLogger = new ClientStatsLogger(statsReceiver); - this.enableRegionStats = enableRegionStats; - this.regionClientStatsLoggers = new ConcurrentHashMap<String, ClientStatsLogger>(); - this.regionResolver = regionResolver; - this.opStatsMap = new ConcurrentHashMap<String, OpStats>(); - } - - public OpStats getOpStats(String op) { - OpStats opStats = opStatsMap.get(op); - if (null != opStats) { - return opStats; - } - OpStats newStats = new OpStats(statsReceiver.scope(op), - enableRegionStats, regionResolver); - OpStats oldStats = opStatsMap.putIfAbsent(op, newStats); - if (null == oldStats) { - return newStats; - } else { - return oldStats; - } - } - - private ClientStatsLogger getRegionClientStatsLogger(SocketAddress address) { - String region = regionResolver.resolveRegion(address); - return getRegionClientStatsLogger(region); - } - - private ClientStatsLogger getRegionClientStatsLogger(String region) { - ClientStatsLogger statsLogger = regionClientStatsLoggers.get(region); - if (null == statsLogger) { - ClientStatsLogger newStatsLogger = new ClientStatsLogger(statsReceiver.scope(region)); - ClientStatsLogger oldStatsLogger = regionClientStatsLoggers.putIfAbsent(region, newStatsLogger); - if (null == oldStatsLogger) { - statsLogger = newStatsLogger; - } else { - statsLogger = oldStatsLogger; - } - } - return statsLogger; - } - - public StatsReceiver getFinagleStatsReceiver(SocketAddress addr) { - if (enableRegionStats && null != addr) { - return getRegionClientStatsLogger(addr).getStatsReceiver(); - } else { - return clientStatsLogger.getStatsReceiver(); - } - } - - public void completeProxyRequest(SocketAddress addr, StatusCode code, long startTimeNanos) { - clientStatsLogger.completeProxyRequest(code, startTimeNanos); - if (enableRegionStats && null != addr) { - getRegionClientStatsLogger(addr).completeProxyRequest(code, startTimeNanos); - } - } - - public void failProxyRequest(SocketAddress addr, Throwable cause, long startTimeNanos) { - clientStatsLogger.failProxyRequest(cause, startTimeNanos); - if (enableRegionStats && null != addr) { - getRegionClientStatsLogger(addr).failProxyRequest(cause, startTimeNanos); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/ClientStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/ClientStatsLogger.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/ClientStatsLogger.java deleted file mode 100644 index 0df64cc..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/ClientStatsLogger.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.stats; - -import com.twitter.distributedlog.thrift.service.StatusCode; -import com.twitter.finagle.stats.Counter; -import com.twitter.finagle.stats.Stat; -import com.twitter.finagle.stats.StatsReceiver; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - -/** - * Stats Logger to collect client stats. - */ -public class ClientStatsLogger { - - // Stats - private final StatsReceiver statsReceiver; - private final StatsReceiver responseStatsReceiver; - private final ConcurrentMap<StatusCode, Counter> responseStats = - new ConcurrentHashMap<StatusCode, Counter>(); - private final StatsReceiver exceptionStatsReceiver; - private final ConcurrentMap<Class<?>, Counter> exceptionStats = - new ConcurrentHashMap<Class<?>, Counter>(); - - private final Stat proxySuccessLatencyStat; - private final Stat proxyFailureLatencyStat; - - public ClientStatsLogger(StatsReceiver statsReceiver) { - this.statsReceiver = statsReceiver; - responseStatsReceiver = statsReceiver.scope("responses"); - exceptionStatsReceiver = statsReceiver.scope("exceptions"); - StatsReceiver proxyLatencyStatReceiver = statsReceiver.scope("proxy_request_latency"); - proxySuccessLatencyStat = proxyLatencyStatReceiver.stat0("success"); - proxyFailureLatencyStat = proxyLatencyStatReceiver.stat0("failure"); - } - - public StatsReceiver getStatsReceiver() { - return statsReceiver; - } - - private Counter getResponseCounter(StatusCode code) { - Counter counter = responseStats.get(code); - if (null == counter) { - Counter newCounter = responseStatsReceiver.counter0(code.name()); - Counter oldCounter = responseStats.putIfAbsent(code, newCounter); - counter = null != oldCounter ? oldCounter : newCounter; - } - return counter; - } - - private Counter getExceptionCounter(Class<?> cls) { - Counter counter = exceptionStats.get(cls); - if (null == counter) { - Counter newCounter = exceptionStatsReceiver.counter0(cls.getName()); - Counter oldCounter = exceptionStats.putIfAbsent(cls, newCounter); - counter = null != oldCounter ? oldCounter : newCounter; - } - return counter; - } - - public void completeProxyRequest(StatusCode code, long startTimeNanos) { - getResponseCounter(code).incr(); - proxySuccessLatencyStat.add(elapsedMicroSec(startTimeNanos)); - } - - public void failProxyRequest(Throwable cause, long startTimeNanos) { - getExceptionCounter(cause.getClass()).incr(); - proxyFailureLatencyStat.add(elapsedMicroSec(startTimeNanos)); - } - - static long elapsedMicroSec(long startNanoTime) { - return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanoTime); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OpStats.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OpStats.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OpStats.java deleted file mode 100644 index 26708f3..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OpStats.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.stats; - -import com.twitter.distributedlog.client.resolver.RegionResolver; -import com.twitter.finagle.stats.StatsReceiver; -import java.net.SocketAddress; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Op Stats. - */ -public class OpStats { - - // Region Resolver - private final RegionResolver regionResolver; - - // Stats - private final StatsReceiver statsReceiver; - private final OpStatsLogger opStatsLogger; - private final boolean enableRegionStats; - private final ConcurrentMap<String, OpStatsLogger> regionOpStatsLoggers; - - public OpStats(StatsReceiver statsReceiver, - boolean enableRegionStats, - RegionResolver regionResolver) { - this.statsReceiver = statsReceiver; - this.opStatsLogger = new OpStatsLogger(statsReceiver); - this.enableRegionStats = enableRegionStats; - this.regionOpStatsLoggers = new ConcurrentHashMap<String, OpStatsLogger>(); - this.regionResolver = regionResolver; - } - - private OpStatsLogger getRegionOpStatsLogger(SocketAddress address) { - String region = regionResolver.resolveRegion(address); - return getRegionOpStatsLogger(region); - } - - private OpStatsLogger getRegionOpStatsLogger(String region) { - OpStatsLogger statsLogger = regionOpStatsLoggers.get(region); - if (null == statsLogger) { - OpStatsLogger newStatsLogger = new OpStatsLogger(statsReceiver.scope(region)); - OpStatsLogger oldStatsLogger = regionOpStatsLoggers.putIfAbsent(region, newStatsLogger); - if (null == oldStatsLogger) { - statsLogger = newStatsLogger; - } else { - statsLogger = oldStatsLogger; - } - } - return statsLogger; - } - - public void completeRequest(SocketAddress addr, long micros, int numTries) { - opStatsLogger.completeRequest(micros, numTries); - if (enableRegionStats && null != addr) { - getRegionOpStatsLogger(addr).completeRequest(micros, numTries); - } - } - - public void failRequest(SocketAddress addr, long micros, int numTries) { - opStatsLogger.failRequest(micros, numTries); - if (enableRegionStats && null != addr) { - getRegionOpStatsLogger(addr).failRequest(micros, numTries); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OpStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OpStatsLogger.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OpStatsLogger.java deleted file mode 100644 index 27adda7..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OpStatsLogger.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.stats; - -import com.twitter.finagle.stats.Stat; -import com.twitter.finagle.stats.StatsReceiver; - -/** - * Stats Logger per operation type. - */ -public class OpStatsLogger { - - private final Stat successLatencyStat; - private final Stat failureLatencyStat; - private final Stat redirectStat; - - public OpStatsLogger(StatsReceiver statsReceiver) { - StatsReceiver latencyStatReceiver = statsReceiver.scope("latency"); - successLatencyStat = latencyStatReceiver.stat0("success"); - failureLatencyStat = latencyStatReceiver.stat0("failure"); - StatsReceiver redirectStatReceiver = statsReceiver.scope("redirects"); - redirectStat = redirectStatReceiver.stat0("times"); - } - - public void completeRequest(long micros, int numTries) { - successLatencyStat.add(micros); - redirectStat.add(numTries); - } - - public void failRequest(long micros, int numTries) { - failureLatencyStat.add(micros); - redirectStat.add(numTries); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OwnershipStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OwnershipStatsLogger.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OwnershipStatsLogger.java deleted file mode 100644 index 7d5a9c9..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/OwnershipStatsLogger.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client.stats; - -import com.twitter.finagle.stats.Counter; -import com.twitter.finagle.stats.StatsReceiver; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Stats Logger for ownerships. - */ -public class OwnershipStatsLogger { - - /** - * Ownership related stats. - */ - public static class OwnershipStat { - private final Counter hits; - private final Counter misses; - private final Counter removes; - private final Counter redirects; - private final Counter adds; - - OwnershipStat(StatsReceiver ownershipStats) { - hits = ownershipStats.counter0("hits"); - misses = ownershipStats.counter0("misses"); - adds = ownershipStats.counter0("adds"); - removes = ownershipStats.counter0("removes"); - redirects = ownershipStats.counter0("redirects"); - } - - public void onHit() { - hits.incr(); - } - - public void onMiss() { - misses.incr(); - } - - public void onAdd() { - adds.incr(); - } - - public void onRemove() { - removes.incr(); - } - - public void onRedirect() { - redirects.incr(); - } - - } - - private final OwnershipStat ownershipStat; - private final StatsReceiver ownershipStatsReceiver; - private final ConcurrentMap<String, OwnershipStat> ownershipStats = - new ConcurrentHashMap<String, OwnershipStat>(); - - public OwnershipStatsLogger(StatsReceiver statsReceiver, - StatsReceiver streamStatsReceiver) { - this.ownershipStat = new OwnershipStat(statsReceiver.scope("ownership")); - this.ownershipStatsReceiver = streamStatsReceiver.scope("perstream_ownership"); - } - - private OwnershipStat getOwnershipStat(String stream) { - OwnershipStat stat = ownershipStats.get(stream); - if (null == stat) { - OwnershipStat newStat = new OwnershipStat(ownershipStatsReceiver.scope(stream)); - OwnershipStat oldStat = ownershipStats.putIfAbsent(stream, newStat); - stat = null != oldStat ? oldStat : newStat; - } - return stat; - } - - public void onMiss(String stream) { - ownershipStat.onMiss(); - getOwnershipStat(stream).onMiss(); - } - - public void onHit(String stream) { - ownershipStat.onHit(); - getOwnershipStat(stream).onHit(); - } - - public void onRedirect(String stream) { - ownershipStat.onRedirect(); - getOwnershipStat(stream).onRedirect(); - } - - public void onRemove(String stream) { - ownershipStat.onRemove(); - getOwnershipStat(stream).onRemove(); - } - - public void onAdd(String stream) { - ownershipStat.onAdd(); - getOwnershipStat(stream).onAdd(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/package-info.java deleted file mode 100644 index 91d4f39..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/stats/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Client side stats utils. - */ -package com.twitter.distributedlog.client.stats; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DLSocketAddress.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DLSocketAddress.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DLSocketAddress.java deleted file mode 100644 index 30891c0..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DLSocketAddress.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.service; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; - -/** - * Socket Address identifier for a DL proxy. - */ -public class DLSocketAddress { - - private static final int VERSION = 1; - - private static final String COLON = ":"; - private static final String SEP = ";"; - - private final int shard; - private final InetSocketAddress socketAddress; - - public DLSocketAddress(int shard, InetSocketAddress socketAddress) { - this.shard = shard; - this.socketAddress = socketAddress; - } - - /** - * Shard id for dl write proxy. - * - * @return shard id for dl write proxy. - */ - public int getShard() { - return shard; - } - - /** - * Socket address for dl write proxy. - * - * @return socket address for dl write proxy - */ - public InetSocketAddress getSocketAddress() { - return socketAddress; - } - - /** - * Serialize the write proxy identifier to string. - * - * @return serialized write proxy identifier. - */ - public String serialize() { - return toLockId(socketAddress, shard); - } - - @Override - public int hashCode() { - return socketAddress.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof DLSocketAddress)) { - return false; - } - DLSocketAddress other = (DLSocketAddress) obj; - return shard == other.shard && socketAddress.equals(other.socketAddress); - } - - @Override - public String toString() { - return toLockId(socketAddress, shard); - } - - /** - * Deserialize proxy address from a string representation. - * - * @param lockId - * string representation of the proxy address. - * @return proxy address. - * @throws IOException - */ - public static DLSocketAddress deserialize(String lockId) throws IOException { - String parts[] = lockId.split(SEP); - if (3 != parts.length) { - throw new IOException("Invalid dl socket address " + lockId); - } - int version; - try { - version = Integer.parseInt(parts[0]); - } catch (NumberFormatException nfe) { - throw new IOException("Invalid version found in " + lockId, nfe); - } - if (VERSION != version) { - throw new IOException("Invalid version " + version + " found in " + lockId + ", expected " + VERSION); - } - int shardId; - try { - shardId = Integer.parseInt(parts[1]); - } catch (NumberFormatException nfe) { - throw new IOException("Invalid shard id found in " + lockId, nfe); - } - InetSocketAddress address = parseSocketAddress(parts[2]); - return new DLSocketAddress(shardId, address); - } - - /** - * Parse the inet socket address from the string representation. - * - * @param addr - * string representation - * @return inet socket address - */ - public static InetSocketAddress parseSocketAddress(String addr) { - String[] parts = addr.split(COLON); - checkArgument(parts.length == 2); - String hostname = parts[0]; - int port = Integer.parseInt(parts[1]); - return new InetSocketAddress(hostname, port); - } - - public static InetSocketAddress getSocketAddress(int port) throws UnknownHostException { - return new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port); - } - - /** - * Convert inet socket address to the string representation. - * - * @param address - * inet socket address. - * @return string representation of inet socket address. - */ - public static String toString(InetSocketAddress address) { - StringBuilder sb = new StringBuilder(); - sb.append(address.getHostName()).append(COLON).append(address.getPort()); - return sb.toString(); - } - - public static String toLockId(InetSocketAddress address, int shard) { - StringBuilder sb = new StringBuilder(); - sb.append(VERSION).append(SEP).append(shard).append(SEP).append(toString(address)); - return sb.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClient.java deleted file mode 100644 index a2c5150..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClient.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.service; - -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.LogRecordSetBuffer; -import com.twitter.util.Future; -import java.nio.ByteBuffer; -import java.util.List; - -/** - * Interface for distributedlog client. - */ -public interface DistributedLogClient { - /** - * Write <i>data</i> to a given <i>stream</i>. - * - * @param stream - * Stream Name. - * @param data - * Data to write. - * @return a future representing a sequence id returned for this write. - */ - Future<DLSN> write(String stream, ByteBuffer data); - - /** - * Write record set to a given <i>stream</i>. - * - * <p>The record set is built from {@link com.twitter.distributedlog.LogRecordSet.Writer} - * - * @param stream stream to write to - * @param recordSet record set - */ - Future<DLSN> writeRecordSet(String stream, LogRecordSetBuffer recordSet); - - /** - * Write <i>data</i> in bulk to a given <i>stream</i>. - * - * <p>Return a list of Future dlsns, one for each submitted buffer. In the event of a partial - * failure--ex. some specific buffer write fails, all subsequent writes - * will also fail. - * - * @param stream - * Stream Name. - * @param data - * Data to write. - * @return a list of futures, one for each submitted buffer. - */ - List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data); - - /** - * Truncate the stream to a given <i>dlsn</i>. - * - * @param stream - * Stream Name. - * @param dlsn - * DLSN to truncate until. - * @return a future representing the truncation. - */ - Future<Boolean> truncate(String stream, DLSN dlsn); - - /** - * Release the ownership of a stream <i>stream</i>. - * - * @param stream - * Stream Name to release. - * @return a future representing the release operation. - */ - Future<Void> release(String stream); - - /** - * Delete a given stream <i>stream</i>. - * - * @param stream - * Stream Name to delete. - * @return a future representing the delete operation. - */ - Future<Void> delete(String stream); - - /** - * Create a stream with name <i>stream</i>. - * - * @param stream - * Stream Name to create. - * @return a future representing the create operation. - */ - Future<Void> create(String stream); - - /** - * Close the client. - */ - void close(); -}