http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java index 111a874..0ee7db4 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java @@ -21,26 +21,27 @@ import com.google.common.base.Optional; import com.twitter.distributedlog.config.DynamicConfigurationFactory; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; -import org.apache.commons.configuration.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.commons.configuration.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Provide per stream configuration to DistributedLog service layer. */ public class ServiceStreamConfigProvider implements StreamConfigProvider { - static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class); + + private static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class); + + private static final String CONFIG_EXTENSION = "conf"; private final File configBaseDir; private final File defaultConfigFile; private final StreamPartitionConverter partitionConverter; private final DynamicConfigurationFactory configFactory; private final DynamicDistributedLogConfiguration defaultDynConf; - private final static String CONFIG_EXTENSION = "conf"; public ServiceStreamConfigProvider(String configPath, String defaultConfigPath, @@ -51,11 +52,13 @@ public class ServiceStreamConfigProvider implements StreamConfigProvider { throws ConfigurationException { this.configBaseDir = new File(configPath); if (!configBaseDir.exists()) { - throw new ConfigurationException("Stream configuration base directory " + configPath + " does not exist"); + throw new ConfigurationException("Stream configuration base directory " + + configPath + " does not exist"); } this.defaultConfigFile = new File(configPath); if (!defaultConfigFile.exists()) { - throw new ConfigurationException("Stream configuration default config " + defaultConfigPath + " does not exist"); + throw new ConfigurationException("Stream configuration default config " + + defaultConfigPath + " does not exist"); } // Construct reloading default configuration
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java new file mode 100644 index 0000000..bb0026a --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * DistributedLog Server Configurations. + */ +package com.twitter.distributedlog.service.config; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java new file mode 100644 index 0000000..4fb3673 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * DistributedLog Proxy Service. + */ +package com.twitter.distributedlog.service; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java index 144e358..fb2d6d2 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,18 +20,20 @@ package com.twitter.distributedlog.service.placement; import com.twitter.util.Future; /** - * Created for those who hold these truths to be self-evident, that all streams are created equal, + * Equal Load Appraiser. + * + * <p>Created for those who hold these truths to be self-evident, that all streams are created equal, * that they are endowed by their creator with certain unalienable loads, that among these are * Uno, Eins, and One. */ public class EqualLoadAppraiser implements LoadAppraiser { - @Override - public Future<StreamLoad> getStreamLoad(String stream) { - return Future.value(new StreamLoad(stream, 1)); - } + @Override + public Future<StreamLoad> getStreamLoad(String stream) { + return Future.value(new StreamLoad(stream, 1)); + } - @Override - public Future<Void> refreshCache() { - return Future.value(null); - } + @Override + public Future<Void> refreshCache() { + return Future.value(null); + } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java index 8c8dc23..c25c267 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,12 @@ */ package com.twitter.distributedlog.service.placement; +import com.twitter.distributedlog.client.routing.RoutingService; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.util.Duration; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.Futures; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -24,174 +30,171 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; - -import scala.Function1; -import scala.runtime.BoxedUnit; - import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.Stats; import org.apache.bookkeeper.stats.StatsLogger; import org.slf4j.Logger; - -import com.twitter.distributedlog.client.routing.RoutingService; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.util.Duration; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.Futures; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; /** - * A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as + * Least Load Placement Policy. + * + * <p>A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what * the load of a server would be. This placement policy then distributes these streams across the * servers. */ public class LeastLoadPlacementPolicy extends PlacementPolicy { - private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>(); - private Map<String, String> streamToServer = new HashMap<String, String>(); - - public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, - DistributedLogNamespace namespace, PlacementStateManager placementStateManager, - Duration refreshInterval, StatsLogger statsLogger) { - super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger); - statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - if (serverLoads.size() > 0) { - return serverLoads.last().getLoad() - serverLoads.first().getLoad(); - } else { - return getDefaultValue(); - } - } - }); - } - - private synchronized String getStreamOwner(String stream) { - return streamToServer.get(stream); - } - - @Override - public Future<String> placeStream(String stream) { - String streamOwner = getStreamOwner(stream); - if (null != streamOwner) { - return Future.value(streamOwner); + + private static final Logger logger = LoggerFactory.getLogger(LeastLoadPlacementPolicy.class); + + private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>(); + private Map<String, String> streamToServer = new HashMap<String, String>(); + + public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, + DistributedLogNamespace namespace, PlacementStateManager placementStateManager, + Duration refreshInterval, StatsLogger statsLogger) { + super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger); + statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + if (serverLoads.size() > 0) { + return serverLoads.last().getLoad() - serverLoads.first().getLoad(); + } else { + return getDefaultValue(); + } + } + }); } - Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream); - return streamLoadFuture.map(new Function<StreamLoad, String>() { - @Override - public String apply(StreamLoad streamLoad) { - return placeStreamSynchronized(streamLoad); - } - }); - } - - synchronized private String placeStreamSynchronized(StreamLoad streamLoad) { - ServerLoad serverLoad = serverLoads.pollFirst(); - serverLoad.addStream(streamLoad); - serverLoads.add(serverLoad); - return serverLoad.getServer(); - } - - @Override - public void refresh() { - logger.info("Refreshing server loads."); - Future<Void> refresh = loadAppraiser.refreshCache(); - final Set<String> servers = getServers(); - final Set<String> allStreams = getStreams(); - Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(new Function<Void, Future<TreeSet<ServerLoad>>>() { - @Override - public Future<TreeSet<ServerLoad>> apply(Void v1) { - return calculate(servers, allStreams); - } - }); - serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() { - @Override - public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) { - try { - updateServerLoads(serverLoads); - } catch (PlacementStateManager.StateManagerSaveException e) { - logger.error("The refreshed mapping could not be persisted and will not be used.", e); - } - return BoxedUnit.UNIT; - } - }); - } - - synchronized private void updateServerLoads(TreeSet<ServerLoad> serverLoads) throws PlacementStateManager.StateManagerSaveException { - this.placementStateManager.saveOwnership(serverLoads); - this.streamToServer = serverLoadsToMap(serverLoads); - this.serverLoads = serverLoads; - } - - @Override - synchronized public void load(TreeSet<ServerLoad> serverLoads) { - this.serverLoads = serverLoads; - this.streamToServer = serverLoadsToMap(serverLoads); - } - - public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) { - logger.info("Calculating server loads"); - final long startTime = System.currentTimeMillis(); - ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size()); - - for (String stream: streams) { - Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream); - futures.add(streamLoad); + + private synchronized String getStreamOwner(String stream) { + return streamToServer.get(stream); } - return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() { - @Override - public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) { - /* Sort streamLoads so largest streams are placed first for better balance */ - TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>(); - for (StreamLoad streamLoad: streamLoads) { - streamQueue.add(streamLoad); + @Override + public Future<String> placeStream(String stream) { + String streamOwner = getStreamOwner(stream); + if (null != streamOwner) { + return Future.value(streamOwner); } + Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream); + return streamLoadFuture.map(new Function<StreamLoad, String>() { + @Override + public String apply(StreamLoad streamLoad) { + return placeStreamSynchronized(streamLoad); + } + }); + } - TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>(); - for (String server: servers) { - ServerLoad serverLoad = new ServerLoad(server); - if (!streamQueue.isEmpty()) { - serverLoad.addStream(streamQueue.pollFirst()); - } - serverLoads.add(serverLoad); + private synchronized String placeStreamSynchronized(StreamLoad streamLoad) { + ServerLoad serverLoad = serverLoads.pollFirst(); + serverLoad.addStream(streamLoad); + serverLoads.add(serverLoad); + return serverLoad.getServer(); + } + + @Override + public void refresh() { + logger.info("Refreshing server loads."); + Future<Void> refresh = loadAppraiser.refreshCache(); + final Set<String> servers = getServers(); + final Set<String> allStreams = getStreams(); + Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap( + new Function<Void, Future<TreeSet<ServerLoad>>>() { + @Override + public Future<TreeSet<ServerLoad>> apply(Void v1) { + return calculate(servers, allStreams); + } + }); + serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() { + @Override + public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) { + try { + updateServerLoads(serverLoads); + } catch (PlacementStateManager.StateManagerSaveException e) { + logger.error("The refreshed mapping could not be persisted and will not be used.", e); + } + return BoxedUnit.UNIT; + } + }); + } + + private synchronized void updateServerLoads(TreeSet<ServerLoad> serverLoads) + throws PlacementStateManager.StateManagerSaveException { + this.placementStateManager.saveOwnership(serverLoads); + this.streamToServer = serverLoadsToMap(serverLoads); + this.serverLoads = serverLoads; + } + + @Override + public synchronized void load(TreeSet<ServerLoad> serverLoads) { + this.serverLoads = serverLoads; + this.streamToServer = serverLoadsToMap(serverLoads); + } + + public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) { + logger.info("Calculating server loads"); + final long startTime = System.currentTimeMillis(); + ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size()); + + for (String stream : streams) { + Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream); + futures.add(streamLoad); } - while (!streamQueue.isEmpty()) { - ServerLoad serverLoad = serverLoads.pollFirst(); - serverLoad.addStream(streamQueue.pollFirst()); - serverLoads.add(serverLoad); + return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() { + @Override + public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) { + /* Sort streamLoads so largest streams are placed first for better balance */ + TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>(); + for (StreamLoad streamLoad : streamLoads) { + streamQueue.add(streamLoad); + } + + TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>(); + for (String server : servers) { + ServerLoad serverLoad = new ServerLoad(server); + if (!streamQueue.isEmpty()) { + serverLoad.addStream(streamQueue.pollFirst()); + } + serverLoads.add(serverLoad); + } + + while (!streamQueue.isEmpty()) { + ServerLoad serverLoad = serverLoads.pollFirst(); + serverLoad.addStream(streamQueue.pollFirst()); + serverLoads.add(serverLoad); + } + return serverLoads; + } + }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() { + @Override + public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) { + placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime); + return BoxedUnit.UNIT; + } + }).onFailure(new Function<Throwable, BoxedUnit>() { + @Override + public BoxedUnit apply(Throwable t) { + logger.error("Failure calculating loads", t); + placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime); + return BoxedUnit.UNIT; + } + }); + } + + private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) { + HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size()); + for (ServerLoad serverLoad : serverLoads) { + for (StreamLoad streamLoad : serverLoad.getStreamLoads()) { + streamToServer.put(streamLoad.getStream(), serverLoad.getServer()); + } } - return serverLoads; - } - }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() { - @Override - public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) { - placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime); - return BoxedUnit.UNIT; - } - }).onFailure(new Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - logger.error("Failure calculating loads", t); - placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime); - return BoxedUnit.UNIT; - } - }); - } - - private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) { - HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size()); - for (ServerLoad serverLoad: serverLoads) { - for (StreamLoad streamLoad: serverLoad.getStreamLoads()) { - streamToServer.put(streamLoad.getStream(), serverLoad.getServer()); - } + return streamToServer; } - return streamToServer; - } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java index 784f106..53c568a 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java @@ -19,7 +19,21 @@ package com.twitter.distributedlog.service.placement; import com.twitter.util.Future; +/** + * Interface for load appraiser. + */ public interface LoadAppraiser { - Future<StreamLoad> getStreamLoad(String stream); - Future<Void> refreshCache(); + /** + * Retrieve the stream load for a given {@code stream}. + * + * @param stream name of the stream + * @return the stream load of the stream. + */ + Future<StreamLoad> getStreamLoad(String stream); + + /** + * Refesch the cache. + * @return + */ + Future<Void> refreshCache(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java index 2044428..46e8940 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,15 @@ */ package com.twitter.distributedlog.service.placement; +import com.twitter.distributedlog.client.routing.RoutingService; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.service.DLSocketAddress; +import com.twitter.util.Duration; +import com.twitter.util.Function0; +import com.twitter.util.Future; +import com.twitter.util.ScheduledThreadPoolTimer; +import com.twitter.util.Time; +import com.twitter.util.Timer; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -24,125 +33,116 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.TreeSet; - -import scala.runtime.BoxedUnit; - import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.twitter.distributedlog.client.routing.RoutingService; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.service.DLSocketAddress; -import com.twitter.util.Duration; -import com.twitter.util.Function0; -import com.twitter.util.Future; -import com.twitter.util.ScheduledThreadPoolTimer; -import com.twitter.util.Time; -import com.twitter.util.Timer; +import scala.runtime.BoxedUnit; /** - * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream - * contains. The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will + * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream contains. + * + * <p>The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will * then distributed these StreamLoads to the available servers in a manner defined by the * implementation creating ServerLoad objects. It then saves this assignment via the * PlacementStateManager. */ public abstract class PlacementPolicy { - protected final LoadAppraiser loadAppraiser; - protected final RoutingService routingService; - protected final DistributedLogNamespace namespace; - protected final PlacementStateManager placementStateManager; - private final Duration refreshInterval; - protected static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class); - protected final OpStatsLogger placementCalcStats; - private Timer placementRefreshTimer; + private static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class); - public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, - DistributedLogNamespace namespace, PlacementStateManager placementStateManager, - Duration refreshInterval, StatsLogger statsLogger) { - this.loadAppraiser = loadAppraiser; - this.routingService = routingService; - this.namespace = namespace; - this.placementStateManager = placementStateManager; - this.refreshInterval = refreshInterval; - placementCalcStats = statsLogger.getOpStatsLogger("placement"); - } + protected final LoadAppraiser loadAppraiser; + protected final RoutingService routingService; + protected final DistributedLogNamespace namespace; + protected final PlacementStateManager placementStateManager; + private final Duration refreshInterval; + protected final OpStatsLogger placementCalcStats; + private Timer placementRefreshTimer; - public Set<String> getServers() { - Set<SocketAddress> hosts = routingService.getHosts(); - Set<String> servers = new HashSet<String>(hosts.size()); - for (SocketAddress address: hosts) { - servers.add(DLSocketAddress.toString((InetSocketAddress) address)); + public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, + DistributedLogNamespace namespace, PlacementStateManager placementStateManager, + Duration refreshInterval, StatsLogger statsLogger) { + this.loadAppraiser = loadAppraiser; + this.routingService = routingService; + this.namespace = namespace; + this.placementStateManager = placementStateManager; + this.refreshInterval = refreshInterval; + placementCalcStats = statsLogger.getOpStatsLogger("placement"); } - return servers; - } - public Set<String> getStreams() { - Set<String> streams = new HashSet<String>(); - try { - Iterator<String> logs = namespace.getLogs(); - while (logs.hasNext()) { - streams.add(logs.next()); - } - } catch (IOException e) { - logger.error("Could not get streams for placement policy.", e); + public Set<String> getServers() { + Set<SocketAddress> hosts = routingService.getHosts(); + Set<String> servers = new HashSet<String>(hosts.size()); + for (SocketAddress address : hosts) { + servers.add(DLSocketAddress.toString((InetSocketAddress) address)); + } + return servers; } - return streams; - } - - public void start(boolean leader) { - logger.info("Starting placement policy"); - TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>(); - for (String server: getServers()) { - emptyServerLoads.add(new ServerLoad(server)); + public Set<String> getStreams() { + Set<String> streams = new HashSet<String>(); + try { + Iterator<String> logs = namespace.getLogs(); + while (logs.hasNext()) { + streams.add(logs.next()); + } + } catch (IOException e) { + logger.error("Could not get streams for placement policy.", e); + } + return streams; } - load(emptyServerLoads); //Pre-Load so streams don't NPE - if (leader) { //this is the leader shard - logger.info("Shard is leader. Scheduling timed refresh."); - placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true); - placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - refresh(); - return BoxedUnit.UNIT; + + public void start(boolean leader) { + logger.info("Starting placement policy"); + + TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>(); + for (String server : getServers()) { + emptyServerLoads.add(new ServerLoad(server)); } - }); - } else { - logger.info("Shard is not leader. Watching for server load changes."); - placementStateManager.watch(new PlacementStateManager.PlacementCallback() { - @Override - public void callback(TreeSet<ServerLoad> serverLoads) { - if (!serverLoads.isEmpty()) { - load(serverLoads); - } + load(emptyServerLoads); //Pre-Load so streams don't NPE + if (leader) { //this is the leader shard + logger.info("Shard is leader. Scheduling timed refresh."); + placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true); + placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + refresh(); + return BoxedUnit.UNIT; + } + }); + } else { + logger.info("Shard is not leader. Watching for server load changes."); + placementStateManager.watch(new PlacementStateManager.PlacementCallback() { + @Override + public void callback(TreeSet<ServerLoad> serverLoads) { + if (!serverLoads.isEmpty()) { + load(serverLoads); + } + } + }); } - }); } - } - public void close() { - if (placementRefreshTimer != null) { - placementRefreshTimer.stop(); + public void close() { + if (placementRefreshTimer != null) { + placementRefreshTimer.stop(); + } } - } - /** - * Places the stream on a server according to the policy and returns a future contianing the - * host that owns the stream upon completion - */ - public abstract Future<String> placeStream(String stream); + /** + * Places the stream on a server according to the policy. + * + * <p>It returns a future containing the host that owns the stream upon completion + */ + public abstract Future<String> placeStream(String stream); - /** - * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager - */ - public abstract void refresh(); + /** + * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager. + */ + public abstract void refresh(); - /** - * Loads the placement mapping into the node from a TreeSet of ServerLoads - */ - public abstract void load(TreeSet<ServerLoad> serverLoads); + /** + * Loads the placement mapping into the node from a TreeSet of ServerLoads. + */ + public abstract void load(TreeSet<ServerLoad> serverLoads); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java index cd0d906..17e4685 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,46 +20,60 @@ package com.twitter.distributedlog.service.placement; import java.util.TreeSet; /** - * The PlacementStateManager handles persistence of calculated resource placements including, the - * storage once the calculated, and the retrieval by the other shards. + * The PlacementStateManager handles persistence of calculated resource placements. */ public interface PlacementStateManager { - /** - * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage - */ - void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException; + /** + * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage. + */ + void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException; - /** - * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage - */ - TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException; + /** + * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage. + */ + TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException; - /** - * Watch the persistent storage for changes to the ownership mapping and calls placementCallback - * with the new mapping when a change occurs - */ - void watch(PlacementCallback placementCallback); + /** + * Watch the persistent storage for changes to the ownership mapping. + * + * <p>The placementCallback callbacks will be triggered with the new mapping when a change occurs. + */ + void watch(PlacementCallback placementCallback); - interface PlacementCallback { - void callback(TreeSet<ServerLoad> serverLoads); - } + /** + * Placement Callback. + * + * <p>The callback is triggered when server loads are updated. + */ + interface PlacementCallback { + void callback(TreeSet<ServerLoad> serverLoads); + } - abstract class StateManagerException extends Exception { - public StateManagerException(String message, Exception e) { - super(message, e); + /** + * The base exception thrown when state manager encounters errors. + */ + abstract class StateManagerException extends Exception { + public StateManagerException(String message, Exception e) { + super(message, e); + } } - } - class StateManagerLoadException extends StateManagerException { - public StateManagerLoadException(Exception e) { - super("Load of Ownership failed", e); + /** + * Exception thrown when failed to load the ownership mapping. + */ + class StateManagerLoadException extends StateManagerException { + public StateManagerLoadException(Exception e) { + super("Load of Ownership failed", e); + } } - } - class StateManagerSaveException extends StateManagerException { - public StateManagerSaveException(Exception e) { - super("Save of Ownership failed", e); + /** + * Exception thrown when failed to save the ownership mapping. + */ + class StateManagerSaveException extends StateManagerException { + public StateManagerSaveException(Exception e) { + super("Save of Ownership failed", e); + } } - } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java index 801e499..a0b4959 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,139 +17,142 @@ */ package com.twitter.distributedlog.service.placement; +import static com.google.common.base.Charsets.UTF_8; + import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashSet; import java.util.Set; -import com.twitter.distributedlog.service.placement.thrift.*; - import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.thrift.TException; import org.apache.thrift.protocol.TJSONProtocol; import org.apache.thrift.transport.TMemoryBuffer; import org.apache.thrift.transport.TMemoryInputTransport; -import static com.google.common.base.Charsets.UTF_8; - /** - * A comparable data object containing the identifier of the server, total appraised load on the + * An object represents the server load. + * + * <p>A comparable data object containing the identifier of the server, total appraised load on the * server, and all streams assigned to the server by the resource placement mapping. This is * comparable first by load and then by server so that a sorted data structure of these will be * consistent across multiple calculations. */ public class ServerLoad implements Comparable { - private static final int BUFFER_SIZE = 4096000; - private final String server; - private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>(); - private long load = 0l; - - public ServerLoad(String server) { - this.server = server; - } - - synchronized public long addStream(StreamLoad stream) { - this.load += stream.getLoad(); - streamLoads.add(stream); - return this.load; - } - - synchronized public long removeStream(String stream) { - for (StreamLoad streamLoad : streamLoads) { - if (streamLoad.stream.equals(stream)) { - this.load -= streamLoad.getLoad(); - streamLoads.remove(streamLoad); + private static final int BUFFER_SIZE = 4096000; + private final String server; + private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>(); + private long load = 0L; + + public ServerLoad(String server) { + this.server = server; + } + + public synchronized long addStream(StreamLoad stream) { + this.load += stream.getLoad(); + streamLoads.add(stream); return this.load; - } } - return this.load; //Throwing an exception wouldn't help us as our logic should never reach here - } - - public synchronized long getLoad() { - return load; - } - - public synchronized Set<StreamLoad> getStreamLoads() { - return streamLoads; - } - - public synchronized String getServer() { - return server; - } - - protected synchronized com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() { - com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad - = new com.twitter.distributedlog.service.placement.thrift.ServerLoad(); - tServerLoad.setServer(server); - tServerLoad.setLoad(load); - ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads - = new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>(); - for (StreamLoad streamLoad: streamLoads) { - tStreamLoads.add(streamLoad.toThrift()); + + public synchronized long removeStream(String stream) { + for (StreamLoad streamLoad : streamLoads) { + if (streamLoad.stream.equals(stream)) { + this.load -= streamLoad.getLoad(); + streamLoads.remove(streamLoad); + return this.load; + } + } + return this.load; //Throwing an exception wouldn't help us as our logic should never reach here + } + + public synchronized long getLoad() { + return load; + } + + public synchronized Set<StreamLoad> getStreamLoads() { + return streamLoads; + } + + public synchronized String getServer() { + return server; + } + + protected synchronized com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() { + com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad = + new com.twitter.distributedlog.service.placement.thrift.ServerLoad(); + tServerLoad.setServer(server); + tServerLoad.setLoad(load); + ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads = + new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>(); + for (StreamLoad streamLoad : streamLoads) { + tStreamLoads.add(streamLoad.toThrift()); + } + tServerLoad.setStreams(tStreamLoads); + return tServerLoad; + } + + public byte[] serialize() throws IOException { + TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + toThrift().write(protocol); + transport.flush(); + return transport.toString(UTF_8.name()).getBytes(UTF_8); + } catch (TException e) { + throw new IOException("Failed to serialize server load : ", e); + } catch (UnsupportedEncodingException uee) { + throw new IOException("Failed to serialize server load : ", uee); + } + } + + public static ServerLoad deserialize(byte[] data) throws IOException { + com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad = + new com.twitter.distributedlog.service.placement.thrift.ServerLoad(); + TMemoryInputTransport transport = new TMemoryInputTransport(data); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + tServerLoad.read(protocol); + ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer()); + if (tServerLoad.isSetStreams()) { + for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : + tServerLoad.getStreams()) { + serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad())); + } + } + return serverLoad; + } catch (TException e) { + throw new IOException("Failed to deserialize server load : ", e); + } } - tServerLoad.setStreams(tStreamLoads); - return tServerLoad; - } - - public byte[] serialize() throws IOException { - TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - toThrift().write(protocol); - transport.flush(); - return transport.toString(UTF_8.name()).getBytes(UTF_8); - } catch (TException e) { - throw new IOException("Failed to serialize server load : ", e); - } catch (UnsupportedEncodingException uee) { - throw new IOException("Failed to serialize server load : ", uee); + + @Override + public synchronized int compareTo(Object o) { + ServerLoad other = (ServerLoad) o; + if (load == other.getLoad()) { + return server.compareTo(other.getServer()); + } else { + return Long.compare(load, other.getLoad()); + } } - } - - public static ServerLoad deserialize(byte[] data) throws IOException { - com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad - = new com.twitter.distributedlog.service.placement.thrift.ServerLoad(); - TMemoryInputTransport transport = new TMemoryInputTransport(data); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - tServerLoad.read(protocol); - ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer()); - if (tServerLoad.isSetStreams()) { - for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : tServerLoad.getStreams()) { - serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad())); + + @Override + public synchronized boolean equals(Object o) { + if (!(o instanceof ServerLoad)) { + return false; } - } - return serverLoad; - } catch (TException e) { - throw new IOException("Failed to deserialize server load : ", e); + ServerLoad other = (ServerLoad) o; + return server.equals(other.getServer()) + && load == other.getLoad() + && streamLoads.equals(other.getStreamLoads()); } - } - - @Override - public synchronized int compareTo(Object o) { - ServerLoad other = (ServerLoad) o; - if (load == other.getLoad()) { - return server.compareTo(other.getServer()); - } else { - return Long.compare(load, other.getLoad()); + + @Override + public synchronized String toString() { + return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads); } - } - @Override - public synchronized boolean equals(Object o) { - if (!(o instanceof ServerLoad)) { - return false; + @Override + public synchronized int hashCode() { + return new HashCodeBuilder().append(server).append(load).append(streamLoads).build(); } - ServerLoad other = (ServerLoad) o; - return server.equals(other.getServer()) && load == other.getLoad() && streamLoads.equals(other.getStreamLoads()); - } - - @Override - public synchronized String toString() { - return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads); - } - - @Override - public synchronized int hashCode() { - return new HashCodeBuilder().append(server).append(load).append(streamLoads).build(); - } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java index d7b7efd..c0b0ce1 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,96 +17,99 @@ */ package com.twitter.distributedlog.service.placement; +import static com.google.common.base.Charsets.UTF_8; + import java.io.IOException; import java.io.UnsupportedEncodingException; - import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.thrift.TException; import org.apache.thrift.protocol.TJSONProtocol; import org.apache.thrift.transport.TMemoryBuffer; import org.apache.thrift.transport.TMemoryInputTransport; -import static com.google.common.base.Charsets.UTF_8; - /** - * A comparable data object containing the identifier of the stream and the appraised load produced + * An object represent the load of a stream. + * + * <p>A comparable data object containing the identifier of the stream and the appraised load produced * by the stream. */ public class StreamLoad implements Comparable { - private static final int BUFFER_SIZE = 4096; - public final String stream; - private final int load; + private static final int BUFFER_SIZE = 4096; + public final String stream; + private final int load; - public StreamLoad(String stream, int load) { - this.stream = stream; - this.load = load; - } + public StreamLoad(String stream, int load) { + this.stream = stream; + this.load = load; + } - public int getLoad() { - return load; - } + public int getLoad() { + return load; + } - public String getStream() { - return stream; - } + public String getStream() { + return stream; + } - protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() { - com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad(); - return tStreamLoad.setStream(stream).setLoad(load); - } + protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() { + com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = + new com.twitter.distributedlog.service.placement.thrift.StreamLoad(); + return tStreamLoad.setStream(stream).setLoad(load); + } - public byte[] serialize() throws IOException { - TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - toThrift().write(protocol); - transport.flush(); - return transport.toString(UTF_8.name()).getBytes(UTF_8); - } catch (TException e) { - throw new IOException("Failed to serialize stream load : ", e); - } catch (UnsupportedEncodingException uee) { - throw new IOException("Failed to serialize stream load : ", uee); + public byte[] serialize() throws IOException { + TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + toThrift().write(protocol); + transport.flush(); + return transport.toString(UTF_8.name()).getBytes(UTF_8); + } catch (TException e) { + throw new IOException("Failed to serialize stream load : ", e); + } catch (UnsupportedEncodingException uee) { + throw new IOException("Failed to serialize stream load : ", uee); + } } - } - public static StreamLoad deserialize(byte[] data) throws IOException { - com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad(); - TMemoryInputTransport transport = new TMemoryInputTransport(data); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - tStreamLoad.read(protocol); - return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()); - } catch (TException e) { - throw new IOException("Failed to deserialize stream load : ", e); + public static StreamLoad deserialize(byte[] data) throws IOException { + com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = + new com.twitter.distributedlog.service.placement.thrift.StreamLoad(); + TMemoryInputTransport transport = new TMemoryInputTransport(data); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + tStreamLoad.read(protocol); + return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()); + } catch (TException e) { + throw new IOException("Failed to deserialize stream load : ", e); + } } - } - @Override - public int compareTo(Object o) { - StreamLoad other = (StreamLoad) o; - if (load == other.getLoad()) { - return stream.compareTo(other.getStream()); - } else { - return Long.compare(load, other.getLoad()); + @Override + public int compareTo(Object o) { + StreamLoad other = (StreamLoad) o; + if (load == other.getLoad()) { + return stream.compareTo(other.getStream()); + } else { + return Long.compare(load, other.getLoad()); + } } - } - @Override - public boolean equals(Object o) { - if (!(o instanceof StreamLoad)) { - return false; + @Override + public boolean equals(Object o) { + if (!(o instanceof StreamLoad)) { + return false; + } + StreamLoad other = (StreamLoad) o; + return stream.equals(other.getStream()) && load == other.getLoad(); } - StreamLoad other = (StreamLoad) o; - return stream.equals(other.getStream()) && load == other.getLoad(); - } - @Override - public String toString() { - return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load); - } + @Override + public String toString() { + return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load); + } - @Override - public int hashCode() { - return new HashCodeBuilder().append(stream).append(load).build(); - } + @Override + public int hashCode() { + return new HashCodeBuilder().append(stream).append(load).build(); + } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java index 4f01bdc..977ae04 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,13 +17,16 @@ */ package com.twitter.distributedlog.service.placement; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.impl.BKNamespaceDriver; +import com.twitter.distributedlog.util.Utils; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.List; import java.util.TreeSet; - import org.apache.bookkeeper.stats.StatsLogger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -35,139 +38,136 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.twitter.distributedlog.BKDistributedLogNamespace; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.impl.BKNamespaceDriver; -import com.twitter.distributedlog.util.Utils; - /** * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to * avoid necessitating an additional system for the resource placement. */ public class ZKPlacementStateManager implements PlacementStateManager { - static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class); - private static final String SERVER_LOAD_DIR = "/.server-load"; - private final String serverLoadPath; - private final ZooKeeperClient zkClient; + private static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class); - private boolean watching = false; + private static final String SERVER_LOAD_DIR = "/.server-load"; - public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) { - String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri); - zkClient = BKNamespaceDriver.createZKClientBuilder( - String.format("ZKPlacementStateManager-%s", zkServers), - conf, - zkServers, - statsLogger.scope("placement_state_manager")).build(); - serverLoadPath = uri.getPath() + SERVER_LOAD_DIR; - } + private final String serverLoadPath; + private final ZooKeeperClient zkClient; - private void createServerLoadPathIfNoExists(byte[] data) - throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException { - try { - Utils.zkCreateFullPathOptimistic(zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException nee) { - logger.debug("the server load path {} is already created by others", serverLoadPath, nee); + private boolean watching = false; + + public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) { + String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri); + zkClient = BKNamespaceDriver.createZKClientBuilder( + String.format("ZKPlacementStateManager-%s", zkServers), + conf, + zkServers, + statsLogger.scope("placement_state_manager")).build(); + serverLoadPath = uri.getPath() + SERVER_LOAD_DIR; } - } - - @Override - public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException { - logger.info("saving ownership"); - try { - ZooKeeper zk = zkClient.get(); - // use timestamp as data so watchers will see any changes - byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array(); - - if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist - createServerLoadPathIfNoExists(timestamp); - } - - Transaction tx = zk.transaction(); - List<String> children = zk.getChildren(serverLoadPath, false); - HashSet<String> servers = new HashSet<String>(children); - tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated - for (ServerLoad serverLoad : serverLoads) { - String server = serverToZkFormat(serverLoad.getServer()); - String serverPath = serverPath(server); - if (servers.contains(server)) { - servers.remove(server); - tx.setData(serverPath, serverLoad.serialize(), -1); - } else { - tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT); + + private void createServerLoadPathIfNoExists(byte[] data) + throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException { + try { + Utils.zkCreateFullPathOptimistic( + zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nee) { + logger.debug("the server load path {} is already created by others", serverLoadPath, nee); } - } - for (String server : servers) { - tx.delete(serverPath(server), -1); - } - tx.commit(); - } catch (InterruptedException | IOException | KeeperException e) { - throw new StateManagerSaveException(e); } - } - - @Override - public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException { - TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); - try { - ZooKeeper zk = zkClient.get(); - List<String> children = zk.getChildren(serverLoadPath, false); - for (String server : children) { - ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat()))); - } - return ownerships; - } catch (InterruptedException | IOException | KeeperException e) { - throw new StateManagerLoadException(e); + + @Override + public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException { + logger.info("saving ownership"); + try { + ZooKeeper zk = zkClient.get(); + // use timestamp as data so watchers will see any changes + byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array(); + + if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist + createServerLoadPathIfNoExists(timestamp); + } + + Transaction tx = zk.transaction(); + List<String> children = zk.getChildren(serverLoadPath, false); + HashSet<String> servers = new HashSet<String>(children); + tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated + for (ServerLoad serverLoad : serverLoads) { + String server = serverToZkFormat(serverLoad.getServer()); + String serverPath = serverPath(server); + if (servers.contains(server)) { + servers.remove(server); + tx.setData(serverPath, serverLoad.serialize(), -1); + } else { + tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT); + } + } + for (String server : servers) { + tx.delete(serverPath(server), -1); + } + tx.commit(); + } catch (InterruptedException | IOException | KeeperException e) { + throw new StateManagerSaveException(e); + } } - } - @Override - synchronized public void watch(final PlacementCallback callback) { - if (watching) { - return; // do not double watch + @Override + public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException { + TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); + try { + ZooKeeper zk = zkClient.get(); + List<String> children = zk.getChildren(serverLoadPath, false); + for (String server : children) { + ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat()))); + } + return ownerships; + } catch (InterruptedException | IOException | KeeperException e) { + throw new StateManagerLoadException(e); + } } - watching = true; - - try { - ZooKeeper zk = zkClient.get(); - try { - zk.getData(serverLoadPath, new Watcher() { - @Override - public void process(WatchedEvent watchedEvent) { + + @Override + public synchronized void watch(final PlacementCallback callback) { + if (watching) { + return; // do not double watch + } + watching = true; + + try { + ZooKeeper zk = zkClient.get(); try { - callback.callback(loadOwnership()); - } catch (StateManagerLoadException e) { - logger.error("Watch of Ownership failed", e); - } finally { - watching = false; - watch(callback); + zk.getData(serverLoadPath, new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + try { + callback.callback(loadOwnership()); + } catch (StateManagerLoadException e) { + logger.error("Watch of Ownership failed", e); + } finally { + watching = false; + watch(callback); + } + } + }, new Stat()); + } catch (KeeperException.NoNodeException nee) { + byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array(); + createServerLoadPathIfNoExists(timestamp); + watching = false; + watch(callback); } - } - }, new Stat()); - } catch (KeeperException.NoNodeException nee) { - byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array(); - createServerLoadPathIfNoExists(timestamp); - watching = false; - watch(callback); - } - } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) { - logger.error("Watch of Ownership failed", e); - watching = false; - watch(callback); + } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) { + logger.error("Watch of Ownership failed", e); + watching = false; + watch(callback); + } } - } - public String serverPath(String server) { - return String.format("%s/%s", serverLoadPath, server); - } + public String serverPath(String server) { + return String.format("%s/%s", serverLoadPath, server); + } - protected String serverToZkFormat(String server) { - return server.replaceAll("/", "--"); - } + protected String serverToZkFormat(String server) { + return server.replaceAll("/", "--"); + } - protected String zkFormatToServer(String zkFormattedServer) { - return zkFormattedServer.replaceAll("--", "/"); - } + protected String zkFormatToServer(String zkFormattedServer) { + return zkFormattedServer.replaceAll("--", "/"); + } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java new file mode 100644 index 0000000..72c134b --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Placement Policy to place streams across proxy services. + */ +package com.twitter.distributedlog.service.placement; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java index fbef587..b513e24 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java @@ -18,20 +18,18 @@ package com.twitter.distributedlog.service.stream; import com.google.common.base.Stopwatch; - -import com.twitter.distributedlog.util.Sequencer; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Try; import com.twitter.distributedlog.AsyncLogWriter; import com.twitter.distributedlog.exceptions.ChecksumFailedException; import com.twitter.distributedlog.exceptions.DLException; import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; import com.twitter.distributedlog.service.ResponseUtils; import com.twitter.distributedlog.thrift.service.ResponseHeader; - +import com.twitter.distributedlog.util.Sequencer; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import com.twitter.util.Return; +import com.twitter.util.Try; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -40,8 +38,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; +/** + * Abstract Stream Operation. + */ public abstract class AbstractStreamOp<Response> implements StreamOp { - static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class); + + private static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class); + protected final String stream; protected final OpStatsLogger opStatsLogger; private final Promise<Response> result = new Promise<Response>(); @@ -103,7 +106,7 @@ public abstract class AbstractStreamOp<Response> implements StreamOp { } /** - * Fail with current <i>owner</i> and its reason <i>t</i> + * Fail with current <i>owner</i> and its reason <i>t</i>. * * @param cause * failure reason http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java index ae0d67d..a385b84 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java @@ -17,17 +17,18 @@ */ package com.twitter.distributedlog.service.stream; -import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.distributedlog.service.ResponseUtils; -import com.twitter.distributedlog.thrift.service.WriteResponse; import com.twitter.distributedlog.thrift.service.ResponseHeader; +import com.twitter.distributedlog.thrift.service.WriteResponse; +import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.util.Future; - import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.OpStatsLogger; - import scala.runtime.AbstractFunction1; +/** + * Abstract Write Operation. + */ public abstract class AbstractWriteOp extends AbstractStreamOp<WriteResponse> { protected AbstractWriteOp(String stream, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java index c009bb9..4d50b66 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java @@ -17,19 +17,13 @@ */ package com.twitter.distributedlog.service.stream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import com.twitter.distributedlog.exceptions.AlreadyClosedException; import com.twitter.distributedlog.AsyncLogWriter; import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.LogRecord; import com.twitter.distributedlog.acl.AccessControlManager; +import com.twitter.distributedlog.exceptions.AlreadyClosedException; import com.twitter.distributedlog.exceptions.DLException; +import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; import com.twitter.distributedlog.exceptions.RequestDeniedException; import com.twitter.distributedlog.service.ResponseUtils; @@ -39,21 +33,26 @@ import com.twitter.distributedlog.thrift.service.BulkWriteResponse; import com.twitter.distributedlog.thrift.service.ResponseHeader; import com.twitter.distributedlog.thrift.service.StatusCode; import com.twitter.distributedlog.thrift.service.WriteResponse; -import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.Sequencer; import com.twitter.util.ConstFuture; +import com.twitter.util.Future; import com.twitter.util.Future$; import com.twitter.util.FutureEventListener; -import com.twitter.util.Future; import com.twitter.util.Try; - +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; - import scala.runtime.AbstractFunction1; +/** + * Bulk Write Operation. + */ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements WriteOpWithPayload { private final List<ByteBuffer> buffers; private final long payloadSize; @@ -77,9 +76,9 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements try { result.get(); } catch (Exception ex) { - if (ex instanceof OwnershipAcquireFailedException || - ex instanceof AlreadyClosedException || - ex instanceof LockingException) { + if (ex instanceof OwnershipAcquireFailedException + || ex instanceof AlreadyClosedException + || ex instanceof LockingException) { def = true; } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java index 1dde1f9..e30a989 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java @@ -25,13 +25,14 @@ import com.twitter.distributedlog.service.ResponseUtils; import com.twitter.distributedlog.thrift.service.WriteResponse; import com.twitter.distributedlog.util.Sequencer; import com.twitter.util.Future; - -import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.StatsLogger; - import scala.runtime.AbstractFunction1; +/** + * Operation to delete a log stream. + */ public class DeleteOp extends AbstractWriteOp { private final StreamManager streamManager; private final Counter deniedDeleteCounter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java index 4b2cbc1..f34295b 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java @@ -17,6 +17,8 @@ */ package com.twitter.distributedlog.service.stream; +import static com.google.common.base.Charsets.UTF_8; + import com.twitter.distributedlog.AsyncLogWriter; import com.twitter.distributedlog.BKAsyncLogWriter; import com.twitter.distributedlog.DLSN; @@ -28,15 +30,14 @@ import com.twitter.distributedlog.service.ResponseUtils; import com.twitter.distributedlog.thrift.service.WriteResponse; import com.twitter.distributedlog.util.Sequencer; import com.twitter.util.Future; - -import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.StatsLogger; - import scala.runtime.AbstractFunction1; -import static com.google.common.base.Charsets.UTF_8; - +/** + * Heartbeat Operation. + */ public class HeartbeatOp extends AbstractWriteOp { static final byte[] HEARTBEAT_DATA = "heartbeat".getBytes(UTF_8); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java index 25835f6..aa0f1a7 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java @@ -25,13 +25,14 @@ import com.twitter.distributedlog.service.ResponseUtils; import com.twitter.distributedlog.thrift.service.WriteResponse; import com.twitter.distributedlog.util.Sequencer; import com.twitter.util.Future; - -import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.StatsLogger; - import scala.runtime.AbstractFunction1; +/** + * Operation to release ownership of a log stream. + */ public class ReleaseOp extends AbstractWriteOp { private final StreamManager streamManager; private final Counter deniedReleaseCounter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java index a1e3e4f..e015e29 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java @@ -23,13 +23,14 @@ import com.twitter.util.Future; import java.io.IOException; /** - * Stream is the per stream request handler in the DL service layer. The collection of Streams in - * the proxy are managed by StreamManager. + * Stream is the per stream request handler in the DL service layer. + * + * <p>The collection of Streams in the proxy are managed by StreamManager. */ public interface Stream { /** - * Get the stream configuration for this stream + * Get the stream configuration for this stream. * * @return stream configuration */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java index 51d7ffd..0dfbd69 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java @@ -19,6 +19,9 @@ package com.twitter.distributedlog.service.stream; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; +/** + * Factory to create a stream with provided stream configuration {@code streamConf}. + */ public interface StreamFactory { /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java index cb28f1e..566ded6 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java @@ -29,6 +29,9 @@ import com.twitter.util.Timer; import org.apache.bookkeeper.feature.FeatureProvider; import org.jboss.netty.util.HashedWheelTimer; +/** + * The implementation of {@link StreamFactory}. + */ public class StreamFactoryImpl implements StreamFactory { private final String clientId; private final StreamOpStats streamOpStats;