http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java deleted file mode 100644 index 3c53ccf..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java +++ /dev/null @@ -1,246 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.RateLimiter; -import org.apache.distributedlog.client.monitor.MonitorServiceClient; -import org.apache.distributedlog.service.DistributedLogClient; -import java.net.SocketAddress; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A balancer balances ownerships between two targets. - */ -public class SimpleBalancer implements Balancer { - - private static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class); - - protected final String target1; - protected final String target2; - protected final DistributedLogClient targetClient1; - protected final DistributedLogClient targetClient2; - protected final MonitorServiceClient targetMonitor1; - protected final MonitorServiceClient targetMonitor2; - - public SimpleBalancer(String name1, - DistributedLogClient client1, - MonitorServiceClient monitor1, - String name2, - DistributedLogClient client2, - MonitorServiceClient monitor2) { - this.target1 = name1; - this.targetClient1 = client1; - this.targetMonitor1 = monitor1; - this.target2 = name2; - this.targetClient2 = client2; - this.targetMonitor2 = monitor2; - } - - protected static int countNumberStreams(Map<SocketAddress, Set<String>> distribution) { - int count = 0; - for (Set<String> streams : distribution.values()) { - count += streams.size(); - } - return count; - } - - @Override - public void balance(int rebalanceWaterMark, - double rebalanceTolerancePercentage, - int rebalanceConcurrency, - Optional<RateLimiter> rebalanceRateLimiter) { - // get the ownership distributions from individual targets - Map<SocketAddress, Set<String>> distribution1 = targetMonitor1.getStreamOwnershipDistribution(); - Map<SocketAddress, Set<String>> distribution2 = targetMonitor2.getStreamOwnershipDistribution(); - - // get stream counts - int proxyCount1 = distribution1.size(); - int streamCount1 = countNumberStreams(distribution1); - int proxyCount2 = distribution2.size(); - int streamCount2 = countNumberStreams(distribution2); - - logger.info("'{}' has {} streams by {} proxies; while '{}' has {} streams by {} proxies.", - new Object[] {target1, streamCount1, proxyCount1, target2, streamCount2, proxyCount2 }); - - String source, target; - Map<SocketAddress, Set<String>> srcDistribution; - DistributedLogClient srcClient, targetClient; - MonitorServiceClient srcMonitor, targetMonitor; - int srcStreamCount, targetStreamCount; - if (streamCount1 > streamCount2) { - source = target1; - srcStreamCount = streamCount1; - srcClient = targetClient1; - srcMonitor = targetMonitor1; - srcDistribution = distribution1; - - target = target2; - targetStreamCount = streamCount2; - targetClient = targetClient2; - targetMonitor = targetMonitor2; - } else { - source = target2; - srcStreamCount = streamCount2; - srcClient = targetClient2; - srcMonitor = targetMonitor2; - srcDistribution = distribution2; - - target = target1; - targetStreamCount = streamCount1; - targetClient = targetClient1; - targetMonitor = targetMonitor1; - } - - Map<String, Integer> loadDistribution = new HashMap<String, Integer>(); - loadDistribution.put(source, srcStreamCount); - loadDistribution.put(target, targetStreamCount); - - // Calculate how many streams to be rebalanced from src region to target region - int numStreamsToRebalance = BalancerUtils.calculateNumStreamsToRebalance( - source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage); - - if (numStreamsToRebalance <= 0) { - logger.info("No streams need to be rebalanced from '{}' to '{}'.", source, target); - return; - } - - StreamChooser streamChooser = - LimitedStreamChooser.of(new CountBasedStreamChooser(srcDistribution), numStreamsToRebalance); - StreamMover streamMover = - new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor); - - moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter); - } - - @Override - public void balanceAll(String source, - int rebalanceConcurrency, - Optional<RateLimiter> rebalanceRateLimiter) { - String target; - DistributedLogClient sourceClient, targetClient; - MonitorServiceClient sourceMonitor, targetMonitor; - if (target1.equals(source)) { - sourceClient = targetClient1; - sourceMonitor = targetMonitor1; - target = target2; - targetClient = targetClient2; - targetMonitor = targetMonitor2; - } else if (target2.equals(source)) { - sourceClient = targetClient2; - sourceMonitor = targetMonitor2; - target = target1; - targetClient = targetClient1; - targetMonitor = targetMonitor1; - } else { - throw new IllegalArgumentException("Unknown target " + source); - } - - // get the ownership distributions from individual targets - Map<SocketAddress, Set<String>> distribution = sourceMonitor.getStreamOwnershipDistribution(); - - if (distribution.isEmpty()) { - return; - } - - StreamChooser streamChooser = new CountBasedStreamChooser(distribution); - StreamMover streamMover = - new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor); - - moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter); - } - - private void moveStreams(StreamChooser streamChooser, - StreamMover streamMover, - int concurrency, - Optional<RateLimiter> rateLimiter) { - CountDownLatch doneLatch = new CountDownLatch(concurrency); - RegionMover regionMover = new RegionMover(streamChooser, streamMover, rateLimiter, doneLatch); - ExecutorService executorService = Executors.newFixedThreadPool(concurrency); - try { - for (int i = 0; i < concurrency; i++) { - executorService.submit(regionMover); - } - - try { - doneLatch.await(); - } catch (InterruptedException e) { - logger.info("{} is interrupted. Stopping it ...", streamMover); - regionMover.shutdown(); - } - } finally { - executorService.shutdown(); - } - - } - - /** - * Move streams from <i>src</i> region to <i>target</i> region. - */ - static class RegionMover implements Runnable { - - final StreamChooser streamChooser; - final StreamMover streamMover; - final Optional<RateLimiter> rateLimiter; - final CountDownLatch doneLatch; - volatile boolean running = true; - - RegionMover(StreamChooser streamChooser, - StreamMover streamMover, - Optional<RateLimiter> rateLimiter, - CountDownLatch doneLatch) { - this.streamChooser = streamChooser; - this.streamMover = streamMover; - this.rateLimiter = rateLimiter; - this.doneLatch = doneLatch; - } - - @Override - public void run() { - while (running) { - if (rateLimiter.isPresent()) { - rateLimiter.get().acquire(); - } - - String stream = streamChooser.choose(); - if (null == stream) { - break; - } - - streamMover.moveStream(stream); - } - doneLatch.countDown(); - } - - void shutdown() { - running = false; - } - } - - @Override - public void close() { - // no-op - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java deleted file mode 100644 index 1d7b6f7..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -/** - * Choose a stream to rebalance. - */ -public interface StreamChooser { - /** - * Choose a stream to rebalance. - * - * @return stream chose - */ - String choose(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java deleted file mode 100644 index 4a04530..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -/** - * A stream mover to move streams between proxies. - */ -public interface StreamMover { - - /** - * Move given stream <i>streamName</i>. - * - * @param streamName - * stream name to move - * @return <i>true</i> if successfully moved the stream, <i>false</i> when failure happens. - * @throws Exception - */ - boolean moveStream(final String streamName); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java deleted file mode 100644 index 68d934b..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import org.apache.distributedlog.client.monitor.MonitorServiceClient; -import org.apache.distributedlog.service.DistributedLogClient; -import com.twitter.util.Await; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Move Streams from <i>src</i> to <i>target</i>. - */ -public class StreamMoverImpl implements StreamMover { - - private static final Logger logger = LoggerFactory.getLogger(StreamMoverImpl.class); - - final String source, target; - final DistributedLogClient srcClient, targetClient; - final MonitorServiceClient srcMonitor, targetMonitor; - - public StreamMoverImpl(String source, DistributedLogClient srcClient, MonitorServiceClient srcMonitor, - String target, DistributedLogClient targetClient, MonitorServiceClient targetMonitor) { - this.source = source; - this.srcClient = srcClient; - this.srcMonitor = srcMonitor; - this.target = target; - this.targetClient = targetClient; - this.targetMonitor = targetMonitor; - } - - /** - * Move given stream <i>streamName</i>. - * - * @param streamName - * stream name to move - * @return <i>true</i> if successfully moved the stream, <i>false</i> when failure happens. - * @throws Exception - */ - public boolean moveStream(final String streamName) { - try { - doMoveStream(streamName); - return true; - } catch (Exception e) { - return false; - } - } - - private void doMoveStream(final String streamName) throws Exception { - Await.result(srcClient.release(streamName).flatMap(new Function<Void, Future<Void>>() { - @Override - public Future<Void> apply(Void result) { - return targetMonitor.check(streamName).addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - logger.info("Moved stream {} from {} to {}.", - new Object[]{streamName, source, target}); - } - - @Override - public void onFailure(Throwable cause) { - logger.info("Failed to move stream {} from region {} to {} : ", - new Object[]{streamName, source, target, cause}); - } - }); - } - })); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("StreamMover('").append(source).append("' -> '").append(target).append("')"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java deleted file mode 100644 index 9eb8950..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Balancer to move streams around to balance the traffic. - */ -package org.apache.distributedlog.service.balancer; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java deleted file mode 100644 index 7d72093..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.config; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.config.ConcurrentConstConfiguration; -import org.apache.distributedlog.config.ConfigurationSubscription; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.config.FileConfigurationBuilder; -import org.apache.distributedlog.config.PropertiesConfigurationBuilder; -import java.io.File; -import java.net.MalformedURLException; -import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.commons.configuration.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * For all streams return the same dynamic config based on configFile. - */ -public class DefaultStreamConfigProvider implements StreamConfigProvider { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class); - - private final Optional<DynamicDistributedLogConfiguration> dynConf; - private final ConfigurationSubscription confSub; - - public DefaultStreamConfigProvider(String configFilePath, - ScheduledExecutorService executorService, - int reloadPeriod, - TimeUnit reloadUnit) - throws ConfigurationException { - try { - File configFile = new File(configFilePath); - FileConfigurationBuilder properties = - new PropertiesConfigurationBuilder(configFile.toURI().toURL()); - ConcurrentConstConfiguration defaultConf = - new ConcurrentConstConfiguration(new DistributedLogConfiguration()); - DynamicDistributedLogConfiguration conf = - new DynamicDistributedLogConfiguration(defaultConf); - List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(properties); - confSub = new ConfigurationSubscription( - conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit); - this.dynConf = Optional.of(conf); - } catch (MalformedURLException ex) { - throw new ConfigurationException(ex); - } - } - - @Override - public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) { - return dynConf; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java deleted file mode 100644 index 195f29d..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.config; - -import com.google.common.base.Optional; - -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * For all streams return an absent configuration. - */ -public class NullStreamConfigProvider implements StreamConfigProvider { - static final Logger LOG = LoggerFactory.getLogger(NullStreamConfigProvider.class); - - private static final Optional<DynamicDistributedLogConfiguration> nullConf = - Optional.<DynamicDistributedLogConfiguration>absent(); - - @Override - public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) { - return nullConf; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java deleted file mode 100644 index 257b4be..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java +++ /dev/null @@ -1,443 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.config; - -import static com.google.common.base.Preconditions.checkArgument; - -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogConstants; -import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.SystemConfiguration; - -/** - * Configuration for DistributedLog Server. - */ -public class ServerConfiguration extends CompositeConfiguration { - - private static ClassLoader defaultLoader; - - static { - defaultLoader = Thread.currentThread().getContextClassLoader(); - if (null == defaultLoader) { - defaultLoader = DistributedLogConfiguration.class.getClassLoader(); - } - } - - // Server DLSN version - protected static final String SERVER_DLSN_VERSION = "server_dlsn_version"; - protected static final byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1; - - // Server Durable Write Enable/Disable Flag - protected static final String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled"; - protected static final boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true; - - // Server Region Id - protected static final String SERVER_REGION_ID = "server_region_id"; - protected static final int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID; - - // Server Port - protected static final String SERVER_PORT = "server_port"; - protected static final int SERVER_PORT_DEFAULT = 0; - - // Server Shard Id - protected static final String SERVER_SHARD_ID = "server_shard"; - protected static final int SERVER_SHARD_ID_DEFAULT = -1; - - // Server Threads - protected static final String SERVER_NUM_THREADS = "server_threads"; - protected static final int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors(); - - // Server enable per stream stat - protected static final String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat"; - protected static final boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true; - - // Server graceful shutdown period (in millis) - protected static final String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms"; - protected static final long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L; - - // Server service timeout - public static final String SERVER_SERVICE_TIMEOUT_MS = "server_service_timeout_ms"; - public static final String SERVER_SERVICE_TIMEOUT_MS_OLD = "serviceTimeoutMs"; - public static final long SERVER_SERVICE_TIMEOUT_MS_DEFAULT = 0; - - // Server close writer timeout - public static final String SERVER_WRITER_CLOSE_TIMEOUT_MS = "server_writer_close_timeout_ms"; - public static final long SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT = 1000; - - // Server stream probation timeout - public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = "server_stream_probation_timeout_ms"; - public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = "streamProbationTimeoutMs"; - public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60 * 1000 * 5; - - // Server stream to partition converter - protected static final String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class"; - - // Use hostname as the allocator pool name - protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME = - "server_use_hostname_as_allocator_pool_name"; - protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false; - //Configure refresh interval for calculating resource placement in seconds - public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = - "server_resource_placement_refresh_interval_sec"; - public static final int SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120; - - public ServerConfiguration() { - super(); - addConfiguration(new SystemConfiguration()); - } - - /** - * Load configurations from {@link DistributedLogConfiguration}. - * - * @param dlConf - * distributedlog configuration - */ - public void loadConf(DistributedLogConfiguration dlConf) { - addConfiguration(dlConf); - } - - /** - * Set the version to encode dlsn. - * - * @param version - * dlsn version - * @return server configuration - */ - public ServerConfiguration setDlsnVersion(byte version) { - setProperty(SERVER_DLSN_VERSION, version); - return this; - } - - /** - * Get the version to encode dlsn. - * - * @see DLSN - * @return version to encode dlsn. - */ - public byte getDlsnVersion() { - return getByte(SERVER_DLSN_VERSION, SERVER_DLSN_VERSION_DEFAULT); - } - - /** - * Set the flag to enable/disable durable write. - * - * @param enabled - * flag to enable/disable durable write - * @return server configuration - */ - public ServerConfiguration enableDurableWrite(boolean enabled) { - setProperty(SERVER_DURABLE_WRITE_ENABLED, enabled); - return this; - } - - /** - * Is durable write enabled. - * - * @return true if waiting writes to be durable. otherwise false. - */ - public boolean isDurableWriteEnabled() { - return getBoolean(SERVER_DURABLE_WRITE_ENABLED, SERVER_DURABLE_WRITE_ENABLED_DEFAULT); - } - - /** - * Set the region id used to instantiate DistributedLogNamespace. - * - * @param regionId - * region id - * @return server configuration - */ - public ServerConfiguration setRegionId(int regionId) { - setProperty(SERVER_REGION_ID, regionId); - return this; - } - - /** - * Get the region id used to instantiate {@link org.apache.distributedlog.namespace.DistributedLogNamespace}. - * - * @return region id used to instantiate DistributedLogNamespace - */ - public int getRegionId() { - return getInt(SERVER_REGION_ID, SERVER_REGION_ID_DEFAULT); - } - - /** - * Set the server port running for this service. - * - * @param port - * server port - * @return server configuration - */ - public ServerConfiguration setServerPort(int port) { - setProperty(SERVER_PORT, port); - return this; - } - - /** - * Get the server port running for this service. - * - * @return server port - */ - public int getServerPort() { - return getInt(SERVER_PORT, SERVER_PORT_DEFAULT); - } - - /** - * Set the shard id of this server. - * - * @param shardId - * shard id - * @return shard id of this server - */ - public ServerConfiguration setServerShardId(int shardId) { - setProperty(SERVER_SHARD_ID, shardId); - return this; - } - - /** - * Get the shard id of this server. - * - * <p>It would be used to instantiate the client id used for DistributedLogNamespace. - * - * @return shard id of this server. - */ - public int getServerShardId() { - return getInt(SERVER_SHARD_ID, SERVER_SHARD_ID_DEFAULT); - } - - /** - * Get the number of threads for the executor of this server. - * - * @return number of threads for the executor running in this server. - */ - public int getServerThreads() { - return getInt(SERVER_NUM_THREADS, SERVER_NUM_THREADS_DEFAULT); - } - - /** - * Set the number of threads for the executor of this server. - * - * @param numThreads - * number of threads for the executor running in this server. - * @return server configuration - */ - public ServerConfiguration setServerThreads(int numThreads) { - setProperty(SERVER_NUM_THREADS, numThreads); - return this; - } - - /** - * Enable/Disable per stream stat. - * - * @param enabled - * flag to enable/disable per stream stat - * @return server configuration - */ - public ServerConfiguration setPerStreamStatEnabled(boolean enabled) { - setProperty(SERVER_ENABLE_PERSTREAM_STAT, enabled); - return this; - } - - /** - * Whether the per stream stat enabled for not in this server. - * - * @return true if per stream stat enable, otherwise false. - */ - public boolean isPerStreamStatEnabled() { - return getBoolean(SERVER_ENABLE_PERSTREAM_STAT, SERVER_ENABLE_PERSTREAM_STAT_DEFAULT); - } - - /** - * Set the graceful shutdown period in millis. - * - * @param periodMs - * graceful shutdown period in millis. - * @return server configuration - */ - public ServerConfiguration setGracefulShutdownPeriodMs(long periodMs) { - setProperty(SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS, periodMs); - return this; - } - - /** - * Get the graceful shutdown period in millis. - * - * @return graceful shutdown period in millis. - */ - public long getGracefulShutdownPeriodMs() { - return getLong(SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS, SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT); - } - - /** - * Get timeout for stream op execution in proxy layer. - * - * <p>0 disables timeout. - * - * @return timeout for stream operation in proxy layer. - */ - public long getServiceTimeoutMs() { - return getLong(SERVER_SERVICE_TIMEOUT_MS, - getLong(SERVER_SERVICE_TIMEOUT_MS_OLD, SERVER_SERVICE_TIMEOUT_MS_DEFAULT)); - } - - /** - * Set timeout for stream op execution in proxy layer. - * - * <p>0 disables timeout. - * - * @param timeoutMs - * timeout for stream operation in proxy layer. - * @return dl configuration. - */ - public ServerConfiguration setServiceTimeoutMs(long timeoutMs) { - setProperty(SERVER_SERVICE_TIMEOUT_MS, timeoutMs); - return this; - } - - /** - * Get timeout for closing writer in proxy layer. - * - * <p>0 disables timeout. - * - * @return timeout for closing writer in proxy layer. - */ - public long getWriterCloseTimeoutMs() { - return getLong(SERVER_WRITER_CLOSE_TIMEOUT_MS, SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT); - } - - /** - * Set timeout for closing writer in proxy layer. - * - * <p>0 disables timeout. - * - * @param timeoutMs - * timeout for closing writer in proxy layer. - * @return dl configuration. - */ - public ServerConfiguration setWriterCloseTimeoutMs(long timeoutMs) { - setProperty(SERVER_WRITER_CLOSE_TIMEOUT_MS, timeoutMs); - return this; - } - - /** - * How long should stream be kept in cache in probationary state after service timeout. - * - * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds. - * - * @return stream probation timeout in ms. - */ - public long getStreamProbationTimeoutMs() { - return getLong(SERVER_STREAM_PROBATION_TIMEOUT_MS, - getLong(SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD, SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT)); - } - - /** - * How long should stream be kept in cache in probationary state after service timeout. - * - * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds. - * - * @param timeoutMs probation timeout in ms. - * @return server configuration - */ - public ServerConfiguration setStreamProbationTimeoutMs(long timeoutMs) { - setProperty(SERVER_STREAM_PROBATION_TIMEOUT_MS, timeoutMs); - return this; - } - - /** - * Set the stream partition converter class. - * - * @param converterClass - * stream partition converter class - * @return server configuration - */ - public ServerConfiguration setStreamPartitionConverterClass( - Class<? extends StreamPartitionConverter> converterClass) { - setProperty(SERVER_STREAM_PARTITION_CONVERTER_CLASS, converterClass.getName()); - return this; - } - - /** - * Get the stream partition converter class. - * - * @return the stream partition converter class. - * @throws ConfigurationException - */ - public Class<? extends StreamPartitionConverter> getStreamPartitionConverterClass() - throws ConfigurationException { - return ReflectionUtils.getClass( - this, - SERVER_STREAM_PARTITION_CONVERTER_CLASS, - IdentityStreamPartitionConverter.class, - StreamPartitionConverter.class, - defaultLoader); - } - - /** - * Set if use hostname as the allocator pool name. - * - * @param useHostname whether to use hostname as the allocator pool name. - * @return server configuration - * @see #isUseHostnameAsAllocatorPoolName() - */ - public ServerConfiguration setUseHostnameAsAllocatorPoolName(boolean useHostname) { - setProperty(SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME, useHostname); - return this; - } - - /** - * Get if use hostname as the allocator pool name. - * - * @return true if use hostname as the allocator pool name. otherwise, use - * {@link #getServerShardId()} as the allocator pool name. - * @see #getServerShardId() - */ - public boolean isUseHostnameAsAllocatorPoolName() { - return getBoolean(SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME, - SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT); - } - - public ServerConfiguration setResourcePlacementRefreshInterval(int refreshIntervalSecs) { - setProperty(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, refreshIntervalSecs); - return this; - } - - public int getResourcePlacementRefreshInterval() { - return getInt(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT); - } - - /** - * Validate the configuration. - * - * @throws IllegalStateException when there are any invalid settings. - */ - public void validate() { - byte dlsnVersion = getDlsnVersion(); - checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1, - "Unknown dlsn version " + dlsnVersion); - checkArgument(getServerThreads() > 0, - "Invalid number of server threads : " + getServerThreads()); - checkArgument(getServerShardId() >= 0, - "Invalid server shard id : " + getServerShardId()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java deleted file mode 100644 index 29052f9..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.config; - -import com.google.common.base.Optional; -import org.apache.distributedlog.config.DynamicConfigurationFactory; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import java.io.File; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.commons.configuration.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provide per stream configuration to DistributedLog service layer. - */ -public class ServiceStreamConfigProvider implements StreamConfigProvider { - - private static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class); - - private static final String CONFIG_EXTENSION = "conf"; - - private final File configBaseDir; - private final File defaultConfigFile; - private final StreamPartitionConverter partitionConverter; - private final DynamicConfigurationFactory configFactory; - private final DynamicDistributedLogConfiguration defaultDynConf; - - public ServiceStreamConfigProvider(String configPath, - String defaultConfigPath, - StreamPartitionConverter partitionConverter, - ScheduledExecutorService executorService, - int reloadPeriod, - TimeUnit reloadUnit) - throws ConfigurationException { - this.configBaseDir = new File(configPath); - if (!configBaseDir.exists()) { - throw new ConfigurationException("Stream configuration base directory " - + configPath + " does not exist"); - } - this.defaultConfigFile = new File(configPath); - if (!defaultConfigFile.exists()) { - throw new ConfigurationException("Stream configuration default config " - + defaultConfigPath + " does not exist"); - } - - // Construct reloading default configuration - this.partitionConverter = partitionConverter; - this.configFactory = new DynamicConfigurationFactory(executorService, reloadPeriod, reloadUnit); - // We know it exists from the check above. - this.defaultDynConf = configFactory.getDynamicConfiguration(defaultConfigPath).get(); - } - - @Override - public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) { - String configName = partitionConverter.convert(streamName).getStream(); - String configPath = getConfigPath(configName); - Optional<DynamicDistributedLogConfiguration> dynConf = Optional.<DynamicDistributedLogConfiguration>absent(); - try { - dynConf = configFactory.getDynamicConfiguration(configPath, defaultDynConf); - } catch (ConfigurationException ex) { - LOG.warn("Configuration exception for stream {} ({}) at {}", - new Object[] {streamName, configName, configPath, ex}); - } - return dynConf; - } - - private String getConfigPath(String configName) { - return new File(configBaseDir, String.format("%s.%s", configName, CONFIG_EXTENSION)).getPath(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java deleted file mode 100644 index c704f70..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.config; - -import com.google.common.base.Optional; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; - -/** - * Expose per-stream configs to dl proxy. - */ -public interface StreamConfigProvider { - /** - * Get dynamic per stream config overrides for a given stream. - * - * @param streamName stream name to return config for - * @return Optional dynamic configuration instance - */ - Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java deleted file mode 100644 index b07605e..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * DistributedLog Server Configurations. - */ -package org.apache.distributedlog.service.config; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java deleted file mode 100644 index 3fcfeda..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * DistributedLog Proxy Service. - */ -package org.apache.distributedlog.service; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java deleted file mode 100644 index fa3dd49..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import com.twitter.util.Future; - -/** - * Equal Load Appraiser. - * - * <p>Created for those who hold these truths to be self-evident, that all streams are created equal, - * that they are endowed by their creator with certain unalienable loads, that among these are - * Uno, Eins, and One. - */ -public class EqualLoadAppraiser implements LoadAppraiser { - @Override - public Future<StreamLoad> getStreamLoad(String stream) { - return Future.value(new StreamLoad(stream, 1)); - } - - @Override - public Future<Void> refreshCache() { - return Future.value(null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java deleted file mode 100644 index 2e9dd6b..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import org.apache.distributedlog.client.routing.RoutingService; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.util.Duration; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.Futures; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; - -/** - * Least Load Placement Policy. - * - * <p>A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as - * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what - * the load of a server would be. This placement policy then distributes these streams across the - * servers. - */ -public class LeastLoadPlacementPolicy extends PlacementPolicy { - - private static final Logger logger = LoggerFactory.getLogger(LeastLoadPlacementPolicy.class); - - private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>(); - private Map<String, String> streamToServer = new HashMap<String, String>(); - - public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, - DistributedLogNamespace namespace, PlacementStateManager placementStateManager, - Duration refreshInterval, StatsLogger statsLogger) { - super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger); - statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - if (serverLoads.size() > 0) { - return serverLoads.last().getLoad() - serverLoads.first().getLoad(); - } else { - return getDefaultValue(); - } - } - }); - } - - private synchronized String getStreamOwner(String stream) { - return streamToServer.get(stream); - } - - @Override - public Future<String> placeStream(String stream) { - String streamOwner = getStreamOwner(stream); - if (null != streamOwner) { - return Future.value(streamOwner); - } - Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream); - return streamLoadFuture.map(new Function<StreamLoad, String>() { - @Override - public String apply(StreamLoad streamLoad) { - return placeStreamSynchronized(streamLoad); - } - }); - } - - private synchronized String placeStreamSynchronized(StreamLoad streamLoad) { - ServerLoad serverLoad = serverLoads.pollFirst(); - serverLoad.addStream(streamLoad); - serverLoads.add(serverLoad); - return serverLoad.getServer(); - } - - @Override - public void refresh() { - logger.info("Refreshing server loads."); - Future<Void> refresh = loadAppraiser.refreshCache(); - final Set<String> servers = getServers(); - final Set<String> allStreams = getStreams(); - Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap( - new Function<Void, Future<TreeSet<ServerLoad>>>() { - @Override - public Future<TreeSet<ServerLoad>> apply(Void v1) { - return calculate(servers, allStreams); - } - }); - serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() { - @Override - public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) { - try { - updateServerLoads(serverLoads); - } catch (PlacementStateManager.StateManagerSaveException e) { - logger.error("The refreshed mapping could not be persisted and will not be used.", e); - } - return BoxedUnit.UNIT; - } - }); - } - - private synchronized void updateServerLoads(TreeSet<ServerLoad> serverLoads) - throws PlacementStateManager.StateManagerSaveException { - this.placementStateManager.saveOwnership(serverLoads); - this.streamToServer = serverLoadsToMap(serverLoads); - this.serverLoads = serverLoads; - } - - @Override - public synchronized void load(TreeSet<ServerLoad> serverLoads) { - this.serverLoads = serverLoads; - this.streamToServer = serverLoadsToMap(serverLoads); - } - - public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) { - logger.info("Calculating server loads"); - final long startTime = System.currentTimeMillis(); - ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size()); - - for (String stream : streams) { - Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream); - futures.add(streamLoad); - } - - return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() { - @Override - public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) { - /* Sort streamLoads so largest streams are placed first for better balance */ - TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>(); - for (StreamLoad streamLoad : streamLoads) { - streamQueue.add(streamLoad); - } - - TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>(); - for (String server : servers) { - ServerLoad serverLoad = new ServerLoad(server); - if (!streamQueue.isEmpty()) { - serverLoad.addStream(streamQueue.pollFirst()); - } - serverLoads.add(serverLoad); - } - - while (!streamQueue.isEmpty()) { - ServerLoad serverLoad = serverLoads.pollFirst(); - serverLoad.addStream(streamQueue.pollFirst()); - serverLoads.add(serverLoad); - } - return serverLoads; - } - }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() { - @Override - public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) { - placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime); - return BoxedUnit.UNIT; - } - }).onFailure(new Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - logger.error("Failure calculating loads", t); - placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime); - return BoxedUnit.UNIT; - } - }); - } - - private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) { - HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size()); - for (ServerLoad serverLoad : serverLoads) { - for (StreamLoad streamLoad : serverLoad.getStreamLoads()) { - streamToServer.put(streamLoad.getStream(), serverLoad.getServer()); - } - } - return streamToServer; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java deleted file mode 100644 index 5cd8980..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import com.twitter.util.Future; - -/** - * Interface for load appraiser. - */ -public interface LoadAppraiser { - /** - * Retrieve the stream load for a given {@code stream}. - * - * @param stream name of the stream - * @return the stream load of the stream. - */ - Future<StreamLoad> getStreamLoad(String stream); - - /** - * Refesch the cache. - * @return - */ - Future<Void> refreshCache(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java deleted file mode 100644 index ac952aa..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import org.apache.distributedlog.client.routing.RoutingService; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.service.DLSocketAddress; -import com.twitter.util.Duration; -import com.twitter.util.Function0; -import com.twitter.util.Future; -import com.twitter.util.ScheduledThreadPoolTimer; -import com.twitter.util.Time; -import com.twitter.util.Timer; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.TreeSet; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; - -/** - * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream contains. - * - * <p>The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will - * then distributed these StreamLoads to the available servers in a manner defined by the - * implementation creating ServerLoad objects. It then saves this assignment via the - * PlacementStateManager. - */ -public abstract class PlacementPolicy { - - private static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class); - - protected final LoadAppraiser loadAppraiser; - protected final RoutingService routingService; - protected final DistributedLogNamespace namespace; - protected final PlacementStateManager placementStateManager; - private final Duration refreshInterval; - protected final OpStatsLogger placementCalcStats; - private Timer placementRefreshTimer; - - public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, - DistributedLogNamespace namespace, PlacementStateManager placementStateManager, - Duration refreshInterval, StatsLogger statsLogger) { - this.loadAppraiser = loadAppraiser; - this.routingService = routingService; - this.namespace = namespace; - this.placementStateManager = placementStateManager; - this.refreshInterval = refreshInterval; - placementCalcStats = statsLogger.getOpStatsLogger("placement"); - } - - public Set<String> getServers() { - Set<SocketAddress> hosts = routingService.getHosts(); - Set<String> servers = new HashSet<String>(hosts.size()); - for (SocketAddress address : hosts) { - servers.add(DLSocketAddress.toString((InetSocketAddress) address)); - } - return servers; - } - - public Set<String> getStreams() { - Set<String> streams = new HashSet<String>(); - try { - Iterator<String> logs = namespace.getLogs(); - while (logs.hasNext()) { - streams.add(logs.next()); - } - } catch (IOException e) { - logger.error("Could not get streams for placement policy.", e); - } - return streams; - } - - public void start(boolean leader) { - logger.info("Starting placement policy"); - - TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>(); - for (String server : getServers()) { - emptyServerLoads.add(new ServerLoad(server)); - } - load(emptyServerLoads); //Pre-Load so streams don't NPE - if (leader) { //this is the leader shard - logger.info("Shard is leader. Scheduling timed refresh."); - placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true); - placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - refresh(); - return BoxedUnit.UNIT; - } - }); - } else { - logger.info("Shard is not leader. Watching for server load changes."); - placementStateManager.watch(new PlacementStateManager.PlacementCallback() { - @Override - public void callback(TreeSet<ServerLoad> serverLoads) { - if (!serverLoads.isEmpty()) { - load(serverLoads); - } - } - }); - } - } - - public void close() { - if (placementRefreshTimer != null) { - placementRefreshTimer.stop(); - } - } - - /** - * Places the stream on a server according to the policy. - * - * <p>It returns a future containing the host that owns the stream upon completion - */ - public abstract Future<String> placeStream(String stream); - - /** - * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager. - */ - public abstract void refresh(); - - /** - * Loads the placement mapping into the node from a TreeSet of ServerLoads. - */ - public abstract void load(TreeSet<ServerLoad> serverLoads); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java deleted file mode 100644 index 0187bed..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import java.util.TreeSet; - -/** - * The PlacementStateManager handles persistence of calculated resource placements. - */ -public interface PlacementStateManager { - - /** - * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage. - */ - void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException; - - /** - * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage. - */ - TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException; - - /** - * Watch the persistent storage for changes to the ownership mapping. - * - * <p>The placementCallback callbacks will be triggered with the new mapping when a change occurs. - */ - void watch(PlacementCallback placementCallback); - - /** - * Placement Callback. - * - * <p>The callback is triggered when server loads are updated. - */ - interface PlacementCallback { - void callback(TreeSet<ServerLoad> serverLoads); - } - - /** - * The base exception thrown when state manager encounters errors. - */ - abstract class StateManagerException extends Exception { - public StateManagerException(String message, Exception e) { - super(message, e); - } - } - - /** - * Exception thrown when failed to load the ownership mapping. - */ - class StateManagerLoadException extends StateManagerException { - public StateManagerLoadException(Exception e) { - super("Load of Ownership failed", e); - } - } - - /** - * Exception thrown when failed to save the ownership mapping. - */ - class StateManagerSaveException extends StateManagerException { - public StateManagerSaveException(Exception e) { - super("Save of Ownership failed", e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java deleted file mode 100644 index d65c401..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import static com.google.common.base.Charsets.UTF_8; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TJSONProtocol; -import org.apache.thrift.transport.TMemoryBuffer; -import org.apache.thrift.transport.TMemoryInputTransport; - -/** - * An object represents the server load. - * - * <p>A comparable data object containing the identifier of the server, total appraised load on the - * server, and all streams assigned to the server by the resource placement mapping. This is - * comparable first by load and then by server so that a sorted data structure of these will be - * consistent across multiple calculations. - */ -public class ServerLoad implements Comparable { - private static final int BUFFER_SIZE = 4096000; - private final String server; - private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>(); - private long load = 0L; - - public ServerLoad(String server) { - this.server = server; - } - - public synchronized long addStream(StreamLoad stream) { - this.load += stream.getLoad(); - streamLoads.add(stream); - return this.load; - } - - public synchronized long removeStream(String stream) { - for (StreamLoad streamLoad : streamLoads) { - if (streamLoad.stream.equals(stream)) { - this.load -= streamLoad.getLoad(); - streamLoads.remove(streamLoad); - return this.load; - } - } - return this.load; //Throwing an exception wouldn't help us as our logic should never reach here - } - - public synchronized long getLoad() { - return load; - } - - public synchronized Set<StreamLoad> getStreamLoads() { - return streamLoads; - } - - public synchronized String getServer() { - return server; - } - - protected synchronized org.apache.distributedlog.service.placement.thrift.ServerLoad toThrift() { - org.apache.distributedlog.service.placement.thrift.ServerLoad tServerLoad = - new org.apache.distributedlog.service.placement.thrift.ServerLoad(); - tServerLoad.setServer(server); - tServerLoad.setLoad(load); - ArrayList<org.apache.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads = - new ArrayList<org.apache.distributedlog.service.placement.thrift.StreamLoad>(); - for (StreamLoad streamLoad : streamLoads) { - tStreamLoads.add(streamLoad.toThrift()); - } - tServerLoad.setStreams(tStreamLoads); - return tServerLoad; - } - - public byte[] serialize() throws IOException { - TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - toThrift().write(protocol); - transport.flush(); - return transport.toString(UTF_8.name()).getBytes(UTF_8); - } catch (TException e) { - throw new IOException("Failed to serialize server load : ", e); - } catch (UnsupportedEncodingException uee) { - throw new IOException("Failed to serialize server load : ", uee); - } - } - - public static ServerLoad deserialize(byte[] data) throws IOException { - org.apache.distributedlog.service.placement.thrift.ServerLoad tServerLoad = - new org.apache.distributedlog.service.placement.thrift.ServerLoad(); - TMemoryInputTransport transport = new TMemoryInputTransport(data); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - tServerLoad.read(protocol); - ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer()); - if (tServerLoad.isSetStreams()) { - for (org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : - tServerLoad.getStreams()) { - serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad())); - } - } - return serverLoad; - } catch (TException e) { - throw new IOException("Failed to deserialize server load : ", e); - } - } - - @Override - public synchronized int compareTo(Object o) { - ServerLoad other = (ServerLoad) o; - if (load == other.getLoad()) { - return server.compareTo(other.getServer()); - } else { - return Long.compare(load, other.getLoad()); - } - } - - @Override - public synchronized boolean equals(Object o) { - if (!(o instanceof ServerLoad)) { - return false; - } - ServerLoad other = (ServerLoad) o; - return server.equals(other.getServer()) - && load == other.getLoad() - && streamLoads.equals(other.getStreamLoads()); - } - - @Override - public synchronized String toString() { - return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads); - } - - @Override - public synchronized int hashCode() { - return new HashCodeBuilder().append(server).append(load).append(streamLoads).build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java deleted file mode 100644 index f271222..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import static com.google.common.base.Charsets.UTF_8; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TJSONProtocol; -import org.apache.thrift.transport.TMemoryBuffer; -import org.apache.thrift.transport.TMemoryInputTransport; - -/** - * An object represent the load of a stream. - * - * <p>A comparable data object containing the identifier of the stream and the appraised load produced - * by the stream. - */ -public class StreamLoad implements Comparable { - private static final int BUFFER_SIZE = 4096; - public final String stream; - private final int load; - - public StreamLoad(String stream, int load) { - this.stream = stream; - this.load = load; - } - - public int getLoad() { - return load; - } - - public String getStream() { - return stream; - } - - protected org.apache.distributedlog.service.placement.thrift.StreamLoad toThrift() { - org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = - new org.apache.distributedlog.service.placement.thrift.StreamLoad(); - return tStreamLoad.setStream(stream).setLoad(load); - } - - public byte[] serialize() throws IOException { - TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - toThrift().write(protocol); - transport.flush(); - return transport.toString(UTF_8.name()).getBytes(UTF_8); - } catch (TException e) { - throw new IOException("Failed to serialize stream load : ", e); - } catch (UnsupportedEncodingException uee) { - throw new IOException("Failed to serialize stream load : ", uee); - } - } - - public static StreamLoad deserialize(byte[] data) throws IOException { - org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = - new org.apache.distributedlog.service.placement.thrift.StreamLoad(); - TMemoryInputTransport transport = new TMemoryInputTransport(data); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - tStreamLoad.read(protocol); - return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()); - } catch (TException e) { - throw new IOException("Failed to deserialize stream load : ", e); - } - } - - @Override - public int compareTo(Object o) { - StreamLoad other = (StreamLoad) o; - if (load == other.getLoad()) { - return stream.compareTo(other.getStream()); - } else { - return Long.compare(load, other.getLoad()); - } - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof StreamLoad)) { - return false; - } - StreamLoad other = (StreamLoad) o; - return stream.equals(other.getStream()) && load == other.getLoad(); - } - - @Override - public String toString() { - return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load); - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(stream).append(load).build(); - } -}