http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java new file mode 100644 index 0000000..3c53ccf --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.RateLimiter; +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.distributedlog.service.DistributedLogClient; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A balancer balances ownerships between two targets. + */ +public class SimpleBalancer implements Balancer { + + private static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class); + + protected final String target1; + protected final String target2; + protected final DistributedLogClient targetClient1; + protected final DistributedLogClient targetClient2; + protected final MonitorServiceClient targetMonitor1; + protected final MonitorServiceClient targetMonitor2; + + public SimpleBalancer(String name1, + DistributedLogClient client1, + MonitorServiceClient monitor1, + String name2, + DistributedLogClient client2, + MonitorServiceClient monitor2) { + this.target1 = name1; + this.targetClient1 = client1; + this.targetMonitor1 = monitor1; + this.target2 = name2; + this.targetClient2 = client2; + this.targetMonitor2 = monitor2; + } + + protected static int countNumberStreams(Map<SocketAddress, Set<String>> distribution) { + int count = 0; + for (Set<String> streams : distribution.values()) { + count += streams.size(); + } + return count; + } + + @Override + public void balance(int rebalanceWaterMark, + double rebalanceTolerancePercentage, + int rebalanceConcurrency, + Optional<RateLimiter> rebalanceRateLimiter) { + // get the ownership distributions from individual targets + Map<SocketAddress, Set<String>> distribution1 = targetMonitor1.getStreamOwnershipDistribution(); + Map<SocketAddress, Set<String>> distribution2 = targetMonitor2.getStreamOwnershipDistribution(); + + // get stream counts + int proxyCount1 = distribution1.size(); + int streamCount1 = countNumberStreams(distribution1); + int proxyCount2 = distribution2.size(); + int streamCount2 = countNumberStreams(distribution2); + + logger.info("'{}' has {} streams by {} proxies; while '{}' has {} streams by {} proxies.", + new Object[] {target1, streamCount1, proxyCount1, target2, streamCount2, proxyCount2 }); + + String source, target; + Map<SocketAddress, Set<String>> srcDistribution; + DistributedLogClient srcClient, targetClient; + MonitorServiceClient srcMonitor, targetMonitor; + int srcStreamCount, targetStreamCount; + if (streamCount1 > streamCount2) { + source = target1; + srcStreamCount = streamCount1; + srcClient = targetClient1; + srcMonitor = targetMonitor1; + srcDistribution = distribution1; + + target = target2; + targetStreamCount = streamCount2; + targetClient = targetClient2; + targetMonitor = targetMonitor2; + } else { + source = target2; + srcStreamCount = streamCount2; + srcClient = targetClient2; + srcMonitor = targetMonitor2; + srcDistribution = distribution2; + + target = target1; + targetStreamCount = streamCount1; + targetClient = targetClient1; + targetMonitor = targetMonitor1; + } + + Map<String, Integer> loadDistribution = new HashMap<String, Integer>(); + loadDistribution.put(source, srcStreamCount); + loadDistribution.put(target, targetStreamCount); + + // Calculate how many streams to be rebalanced from src region to target region + int numStreamsToRebalance = BalancerUtils.calculateNumStreamsToRebalance( + source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage); + + if (numStreamsToRebalance <= 0) { + logger.info("No streams need to be rebalanced from '{}' to '{}'.", source, target); + return; + } + + StreamChooser streamChooser = + LimitedStreamChooser.of(new CountBasedStreamChooser(srcDistribution), numStreamsToRebalance); + StreamMover streamMover = + new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor); + + moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter); + } + + @Override + public void balanceAll(String source, + int rebalanceConcurrency, + Optional<RateLimiter> rebalanceRateLimiter) { + String target; + DistributedLogClient sourceClient, targetClient; + MonitorServiceClient sourceMonitor, targetMonitor; + if (target1.equals(source)) { + sourceClient = targetClient1; + sourceMonitor = targetMonitor1; + target = target2; + targetClient = targetClient2; + targetMonitor = targetMonitor2; + } else if (target2.equals(source)) { + sourceClient = targetClient2; + sourceMonitor = targetMonitor2; + target = target1; + targetClient = targetClient1; + targetMonitor = targetMonitor1; + } else { + throw new IllegalArgumentException("Unknown target " + source); + } + + // get the ownership distributions from individual targets + Map<SocketAddress, Set<String>> distribution = sourceMonitor.getStreamOwnershipDistribution(); + + if (distribution.isEmpty()) { + return; + } + + StreamChooser streamChooser = new CountBasedStreamChooser(distribution); + StreamMover streamMover = + new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor); + + moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter); + } + + private void moveStreams(StreamChooser streamChooser, + StreamMover streamMover, + int concurrency, + Optional<RateLimiter> rateLimiter) { + CountDownLatch doneLatch = new CountDownLatch(concurrency); + RegionMover regionMover = new RegionMover(streamChooser, streamMover, rateLimiter, doneLatch); + ExecutorService executorService = Executors.newFixedThreadPool(concurrency); + try { + for (int i = 0; i < concurrency; i++) { + executorService.submit(regionMover); + } + + try { + doneLatch.await(); + } catch (InterruptedException e) { + logger.info("{} is interrupted. Stopping it ...", streamMover); + regionMover.shutdown(); + } + } finally { + executorService.shutdown(); + } + + } + + /** + * Move streams from <i>src</i> region to <i>target</i> region. + */ + static class RegionMover implements Runnable { + + final StreamChooser streamChooser; + final StreamMover streamMover; + final Optional<RateLimiter> rateLimiter; + final CountDownLatch doneLatch; + volatile boolean running = true; + + RegionMover(StreamChooser streamChooser, + StreamMover streamMover, + Optional<RateLimiter> rateLimiter, + CountDownLatch doneLatch) { + this.streamChooser = streamChooser; + this.streamMover = streamMover; + this.rateLimiter = rateLimiter; + this.doneLatch = doneLatch; + } + + @Override + public void run() { + while (running) { + if (rateLimiter.isPresent()) { + rateLimiter.get().acquire(); + } + + String stream = streamChooser.choose(); + if (null == stream) { + break; + } + + streamMover.moveStream(stream); + } + doneLatch.countDown(); + } + + void shutdown() { + running = false; + } + } + + @Override + public void close() { + // no-op + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java new file mode 100644 index 0000000..1d7b6f7 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +/** + * Choose a stream to rebalance. + */ +public interface StreamChooser { + /** + * Choose a stream to rebalance. + * + * @return stream chose + */ + String choose(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java new file mode 100644 index 0000000..4a04530 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +/** + * A stream mover to move streams between proxies. + */ +public interface StreamMover { + + /** + * Move given stream <i>streamName</i>. + * + * @param streamName + * stream name to move + * @return <i>true</i> if successfully moved the stream, <i>false</i> when failure happens. + * @throws Exception + */ + boolean moveStream(final String streamName); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java new file mode 100644 index 0000000..68d934b --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.distributedlog.service.DistributedLogClient; +import com.twitter.util.Await; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Move Streams from <i>src</i> to <i>target</i>. + */ +public class StreamMoverImpl implements StreamMover { + + private static final Logger logger = LoggerFactory.getLogger(StreamMoverImpl.class); + + final String source, target; + final DistributedLogClient srcClient, targetClient; + final MonitorServiceClient srcMonitor, targetMonitor; + + public StreamMoverImpl(String source, DistributedLogClient srcClient, MonitorServiceClient srcMonitor, + String target, DistributedLogClient targetClient, MonitorServiceClient targetMonitor) { + this.source = source; + this.srcClient = srcClient; + this.srcMonitor = srcMonitor; + this.target = target; + this.targetClient = targetClient; + this.targetMonitor = targetMonitor; + } + + /** + * Move given stream <i>streamName</i>. + * + * @param streamName + * stream name to move + * @return <i>true</i> if successfully moved the stream, <i>false</i> when failure happens. + * @throws Exception + */ + public boolean moveStream(final String streamName) { + try { + doMoveStream(streamName); + return true; + } catch (Exception e) { + return false; + } + } + + private void doMoveStream(final String streamName) throws Exception { + Await.result(srcClient.release(streamName).flatMap(new Function<Void, Future<Void>>() { + @Override + public Future<Void> apply(Void result) { + return targetMonitor.check(streamName).addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + logger.info("Moved stream {} from {} to {}.", + new Object[]{streamName, source, target}); + } + + @Override + public void onFailure(Throwable cause) { + logger.info("Failed to move stream {} from region {} to {} : ", + new Object[]{streamName, source, target, cause}); + } + }); + } + })); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("StreamMover('").append(source).append("' -> '").append(target).append("')"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/package-info.java new file mode 100644 index 0000000..9eb8950 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Balancer to move streams around to balance the traffic. + */ +package org.apache.distributedlog.service.balancer; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java new file mode 100644 index 0000000..7d72093 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.config; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.config.ConcurrentConstConfiguration; +import org.apache.distributedlog.config.ConfigurationSubscription; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.config.FileConfigurationBuilder; +import org.apache.distributedlog.config.PropertiesConfigurationBuilder; +import java.io.File; +import java.net.MalformedURLException; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.commons.configuration.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * For all streams return the same dynamic config based on configFile. + */ +public class DefaultStreamConfigProvider implements StreamConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class); + + private final Optional<DynamicDistributedLogConfiguration> dynConf; + private final ConfigurationSubscription confSub; + + public DefaultStreamConfigProvider(String configFilePath, + ScheduledExecutorService executorService, + int reloadPeriod, + TimeUnit reloadUnit) + throws ConfigurationException { + try { + File configFile = new File(configFilePath); + FileConfigurationBuilder properties = + new PropertiesConfigurationBuilder(configFile.toURI().toURL()); + ConcurrentConstConfiguration defaultConf = + new ConcurrentConstConfiguration(new DistributedLogConfiguration()); + DynamicDistributedLogConfiguration conf = + new DynamicDistributedLogConfiguration(defaultConf); + List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(properties); + confSub = new ConfigurationSubscription( + conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit); + this.dynConf = Optional.of(conf); + } catch (MalformedURLException ex) { + throw new ConfigurationException(ex); + } + } + + @Override + public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) { + return dynConf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java new file mode 100644 index 0000000..195f29d --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.config; + +import com.google.common.base.Optional; + +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * For all streams return an absent configuration. + */ +public class NullStreamConfigProvider implements StreamConfigProvider { + static final Logger LOG = LoggerFactory.getLogger(NullStreamConfigProvider.class); + + private static final Optional<DynamicDistributedLogConfiguration> nullConf = + Optional.<DynamicDistributedLogConfiguration>absent(); + + @Override + public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) { + return nullConf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java new file mode 100644 index 0000000..257b4be --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.config; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.DistributedLogConstants; +import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.SystemConfiguration; + +/** + * Configuration for DistributedLog Server. + */ +public class ServerConfiguration extends CompositeConfiguration { + + private static ClassLoader defaultLoader; + + static { + defaultLoader = Thread.currentThread().getContextClassLoader(); + if (null == defaultLoader) { + defaultLoader = DistributedLogConfiguration.class.getClassLoader(); + } + } + + // Server DLSN version + protected static final String SERVER_DLSN_VERSION = "server_dlsn_version"; + protected static final byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1; + + // Server Durable Write Enable/Disable Flag + protected static final String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled"; + protected static final boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true; + + // Server Region Id + protected static final String SERVER_REGION_ID = "server_region_id"; + protected static final int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID; + + // Server Port + protected static final String SERVER_PORT = "server_port"; + protected static final int SERVER_PORT_DEFAULT = 0; + + // Server Shard Id + protected static final String SERVER_SHARD_ID = "server_shard"; + protected static final int SERVER_SHARD_ID_DEFAULT = -1; + + // Server Threads + protected static final String SERVER_NUM_THREADS = "server_threads"; + protected static final int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors(); + + // Server enable per stream stat + protected static final String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat"; + protected static final boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true; + + // Server graceful shutdown period (in millis) + protected static final String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms"; + protected static final long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L; + + // Server service timeout + public static final String SERVER_SERVICE_TIMEOUT_MS = "server_service_timeout_ms"; + public static final String SERVER_SERVICE_TIMEOUT_MS_OLD = "serviceTimeoutMs"; + public static final long SERVER_SERVICE_TIMEOUT_MS_DEFAULT = 0; + + // Server close writer timeout + public static final String SERVER_WRITER_CLOSE_TIMEOUT_MS = "server_writer_close_timeout_ms"; + public static final long SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT = 1000; + + // Server stream probation timeout + public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = "server_stream_probation_timeout_ms"; + public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = "streamProbationTimeoutMs"; + public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60 * 1000 * 5; + + // Server stream to partition converter + protected static final String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class"; + + // Use hostname as the allocator pool name + protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME = + "server_use_hostname_as_allocator_pool_name"; + protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false; + //Configure refresh interval for calculating resource placement in seconds + public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = + "server_resource_placement_refresh_interval_sec"; + public static final int SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120; + + public ServerConfiguration() { + super(); + addConfiguration(new SystemConfiguration()); + } + + /** + * Load configurations from {@link DistributedLogConfiguration}. + * + * @param dlConf + * distributedlog configuration + */ + public void loadConf(DistributedLogConfiguration dlConf) { + addConfiguration(dlConf); + } + + /** + * Set the version to encode dlsn. + * + * @param version + * dlsn version + * @return server configuration + */ + public ServerConfiguration setDlsnVersion(byte version) { + setProperty(SERVER_DLSN_VERSION, version); + return this; + } + + /** + * Get the version to encode dlsn. + * + * @see DLSN + * @return version to encode dlsn. + */ + public byte getDlsnVersion() { + return getByte(SERVER_DLSN_VERSION, SERVER_DLSN_VERSION_DEFAULT); + } + + /** + * Set the flag to enable/disable durable write. + * + * @param enabled + * flag to enable/disable durable write + * @return server configuration + */ + public ServerConfiguration enableDurableWrite(boolean enabled) { + setProperty(SERVER_DURABLE_WRITE_ENABLED, enabled); + return this; + } + + /** + * Is durable write enabled. + * + * @return true if waiting writes to be durable. otherwise false. + */ + public boolean isDurableWriteEnabled() { + return getBoolean(SERVER_DURABLE_WRITE_ENABLED, SERVER_DURABLE_WRITE_ENABLED_DEFAULT); + } + + /** + * Set the region id used to instantiate DistributedLogNamespace. + * + * @param regionId + * region id + * @return server configuration + */ + public ServerConfiguration setRegionId(int regionId) { + setProperty(SERVER_REGION_ID, regionId); + return this; + } + + /** + * Get the region id used to instantiate {@link org.apache.distributedlog.namespace.DistributedLogNamespace}. + * + * @return region id used to instantiate DistributedLogNamespace + */ + public int getRegionId() { + return getInt(SERVER_REGION_ID, SERVER_REGION_ID_DEFAULT); + } + + /** + * Set the server port running for this service. + * + * @param port + * server port + * @return server configuration + */ + public ServerConfiguration setServerPort(int port) { + setProperty(SERVER_PORT, port); + return this; + } + + /** + * Get the server port running for this service. + * + * @return server port + */ + public int getServerPort() { + return getInt(SERVER_PORT, SERVER_PORT_DEFAULT); + } + + /** + * Set the shard id of this server. + * + * @param shardId + * shard id + * @return shard id of this server + */ + public ServerConfiguration setServerShardId(int shardId) { + setProperty(SERVER_SHARD_ID, shardId); + return this; + } + + /** + * Get the shard id of this server. + * + * <p>It would be used to instantiate the client id used for DistributedLogNamespace. + * + * @return shard id of this server. + */ + public int getServerShardId() { + return getInt(SERVER_SHARD_ID, SERVER_SHARD_ID_DEFAULT); + } + + /** + * Get the number of threads for the executor of this server. + * + * @return number of threads for the executor running in this server. + */ + public int getServerThreads() { + return getInt(SERVER_NUM_THREADS, SERVER_NUM_THREADS_DEFAULT); + } + + /** + * Set the number of threads for the executor of this server. + * + * @param numThreads + * number of threads for the executor running in this server. + * @return server configuration + */ + public ServerConfiguration setServerThreads(int numThreads) { + setProperty(SERVER_NUM_THREADS, numThreads); + return this; + } + + /** + * Enable/Disable per stream stat. + * + * @param enabled + * flag to enable/disable per stream stat + * @return server configuration + */ + public ServerConfiguration setPerStreamStatEnabled(boolean enabled) { + setProperty(SERVER_ENABLE_PERSTREAM_STAT, enabled); + return this; + } + + /** + * Whether the per stream stat enabled for not in this server. + * + * @return true if per stream stat enable, otherwise false. + */ + public boolean isPerStreamStatEnabled() { + return getBoolean(SERVER_ENABLE_PERSTREAM_STAT, SERVER_ENABLE_PERSTREAM_STAT_DEFAULT); + } + + /** + * Set the graceful shutdown period in millis. + * + * @param periodMs + * graceful shutdown period in millis. + * @return server configuration + */ + public ServerConfiguration setGracefulShutdownPeriodMs(long periodMs) { + setProperty(SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS, periodMs); + return this; + } + + /** + * Get the graceful shutdown period in millis. + * + * @return graceful shutdown period in millis. + */ + public long getGracefulShutdownPeriodMs() { + return getLong(SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS, SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT); + } + + /** + * Get timeout for stream op execution in proxy layer. + * + * <p>0 disables timeout. + * + * @return timeout for stream operation in proxy layer. + */ + public long getServiceTimeoutMs() { + return getLong(SERVER_SERVICE_TIMEOUT_MS, + getLong(SERVER_SERVICE_TIMEOUT_MS_OLD, SERVER_SERVICE_TIMEOUT_MS_DEFAULT)); + } + + /** + * Set timeout for stream op execution in proxy layer. + * + * <p>0 disables timeout. + * + * @param timeoutMs + * timeout for stream operation in proxy layer. + * @return dl configuration. + */ + public ServerConfiguration setServiceTimeoutMs(long timeoutMs) { + setProperty(SERVER_SERVICE_TIMEOUT_MS, timeoutMs); + return this; + } + + /** + * Get timeout for closing writer in proxy layer. + * + * <p>0 disables timeout. + * + * @return timeout for closing writer in proxy layer. + */ + public long getWriterCloseTimeoutMs() { + return getLong(SERVER_WRITER_CLOSE_TIMEOUT_MS, SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT); + } + + /** + * Set timeout for closing writer in proxy layer. + * + * <p>0 disables timeout. + * + * @param timeoutMs + * timeout for closing writer in proxy layer. + * @return dl configuration. + */ + public ServerConfiguration setWriterCloseTimeoutMs(long timeoutMs) { + setProperty(SERVER_WRITER_CLOSE_TIMEOUT_MS, timeoutMs); + return this; + } + + /** + * How long should stream be kept in cache in probationary state after service timeout. + * + * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds. + * + * @return stream probation timeout in ms. + */ + public long getStreamProbationTimeoutMs() { + return getLong(SERVER_STREAM_PROBATION_TIMEOUT_MS, + getLong(SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD, SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT)); + } + + /** + * How long should stream be kept in cache in probationary state after service timeout. + * + * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds. + * + * @param timeoutMs probation timeout in ms. + * @return server configuration + */ + public ServerConfiguration setStreamProbationTimeoutMs(long timeoutMs) { + setProperty(SERVER_STREAM_PROBATION_TIMEOUT_MS, timeoutMs); + return this; + } + + /** + * Set the stream partition converter class. + * + * @param converterClass + * stream partition converter class + * @return server configuration + */ + public ServerConfiguration setStreamPartitionConverterClass( + Class<? extends StreamPartitionConverter> converterClass) { + setProperty(SERVER_STREAM_PARTITION_CONVERTER_CLASS, converterClass.getName()); + return this; + } + + /** + * Get the stream partition converter class. + * + * @return the stream partition converter class. + * @throws ConfigurationException + */ + public Class<? extends StreamPartitionConverter> getStreamPartitionConverterClass() + throws ConfigurationException { + return ReflectionUtils.getClass( + this, + SERVER_STREAM_PARTITION_CONVERTER_CLASS, + IdentityStreamPartitionConverter.class, + StreamPartitionConverter.class, + defaultLoader); + } + + /** + * Set if use hostname as the allocator pool name. + * + * @param useHostname whether to use hostname as the allocator pool name. + * @return server configuration + * @see #isUseHostnameAsAllocatorPoolName() + */ + public ServerConfiguration setUseHostnameAsAllocatorPoolName(boolean useHostname) { + setProperty(SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME, useHostname); + return this; + } + + /** + * Get if use hostname as the allocator pool name. + * + * @return true if use hostname as the allocator pool name. otherwise, use + * {@link #getServerShardId()} as the allocator pool name. + * @see #getServerShardId() + */ + public boolean isUseHostnameAsAllocatorPoolName() { + return getBoolean(SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME, + SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT); + } + + public ServerConfiguration setResourcePlacementRefreshInterval(int refreshIntervalSecs) { + setProperty(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, refreshIntervalSecs); + return this; + } + + public int getResourcePlacementRefreshInterval() { + return getInt(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT); + } + + /** + * Validate the configuration. + * + * @throws IllegalStateException when there are any invalid settings. + */ + public void validate() { + byte dlsnVersion = getDlsnVersion(); + checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1, + "Unknown dlsn version " + dlsnVersion); + checkArgument(getServerThreads() > 0, + "Invalid number of server threads : " + getServerThreads()); + checkArgument(getServerShardId() >= 0, + "Invalid server shard id : " + getServerShardId()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java new file mode 100644 index 0000000..29052f9 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.config; + +import com.google.common.base.Optional; +import org.apache.distributedlog.config.DynamicConfigurationFactory; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import java.io.File; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.commons.configuration.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provide per stream configuration to DistributedLog service layer. + */ +public class ServiceStreamConfigProvider implements StreamConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class); + + private static final String CONFIG_EXTENSION = "conf"; + + private final File configBaseDir; + private final File defaultConfigFile; + private final StreamPartitionConverter partitionConverter; + private final DynamicConfigurationFactory configFactory; + private final DynamicDistributedLogConfiguration defaultDynConf; + + public ServiceStreamConfigProvider(String configPath, + String defaultConfigPath, + StreamPartitionConverter partitionConverter, + ScheduledExecutorService executorService, + int reloadPeriod, + TimeUnit reloadUnit) + throws ConfigurationException { + this.configBaseDir = new File(configPath); + if (!configBaseDir.exists()) { + throw new ConfigurationException("Stream configuration base directory " + + configPath + " does not exist"); + } + this.defaultConfigFile = new File(configPath); + if (!defaultConfigFile.exists()) { + throw new ConfigurationException("Stream configuration default config " + + defaultConfigPath + " does not exist"); + } + + // Construct reloading default configuration + this.partitionConverter = partitionConverter; + this.configFactory = new DynamicConfigurationFactory(executorService, reloadPeriod, reloadUnit); + // We know it exists from the check above. + this.defaultDynConf = configFactory.getDynamicConfiguration(defaultConfigPath).get(); + } + + @Override + public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) { + String configName = partitionConverter.convert(streamName).getStream(); + String configPath = getConfigPath(configName); + Optional<DynamicDistributedLogConfiguration> dynConf = Optional.<DynamicDistributedLogConfiguration>absent(); + try { + dynConf = configFactory.getDynamicConfiguration(configPath, defaultDynConf); + } catch (ConfigurationException ex) { + LOG.warn("Configuration exception for stream {} ({}) at {}", + new Object[] {streamName, configName, configPath, ex}); + } + return dynConf; + } + + private String getConfigPath(String configName) { + return new File(configBaseDir, String.format("%s.%s", configName, CONFIG_EXTENSION)).getPath(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java new file mode 100644 index 0000000..c704f70 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.config; + +import com.google.common.base.Optional; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; + +/** + * Expose per-stream configs to dl proxy. + */ +public interface StreamConfigProvider { + /** + * Get dynamic per stream config overrides for a given stream. + * + * @param streamName stream name to return config for + * @return Optional dynamic configuration instance + */ + Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/package-info.java new file mode 100644 index 0000000..b07605e --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * DistributedLog Server Configurations. + */ +package org.apache.distributedlog.service.config; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/package-info.java new file mode 100644 index 0000000..3fcfeda --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * DistributedLog Proxy Service. + */ +package org.apache.distributedlog.service; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java new file mode 100644 index 0000000..fa3dd49 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import com.twitter.util.Future; + +/** + * Equal Load Appraiser. + * + * <p>Created for those who hold these truths to be self-evident, that all streams are created equal, + * that they are endowed by their creator with certain unalienable loads, that among these are + * Uno, Eins, and One. + */ +public class EqualLoadAppraiser implements LoadAppraiser { + @Override + public Future<StreamLoad> getStreamLoad(String stream) { + return Future.value(new StreamLoad(stream, 1)); + } + + @Override + public Future<Void> refreshCache() { + return Future.value(null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java new file mode 100644 index 0000000..2e9dd6b --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import org.apache.distributedlog.client.routing.RoutingService; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.util.Duration; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.Futures; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +/** + * Least Load Placement Policy. + * + * <p>A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as + * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what + * the load of a server would be. This placement policy then distributes these streams across the + * servers. + */ +public class LeastLoadPlacementPolicy extends PlacementPolicy { + + private static final Logger logger = LoggerFactory.getLogger(LeastLoadPlacementPolicy.class); + + private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>(); + private Map<String, String> streamToServer = new HashMap<String, String>(); + + public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, + DistributedLogNamespace namespace, PlacementStateManager placementStateManager, + Duration refreshInterval, StatsLogger statsLogger) { + super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger); + statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + if (serverLoads.size() > 0) { + return serverLoads.last().getLoad() - serverLoads.first().getLoad(); + } else { + return getDefaultValue(); + } + } + }); + } + + private synchronized String getStreamOwner(String stream) { + return streamToServer.get(stream); + } + + @Override + public Future<String> placeStream(String stream) { + String streamOwner = getStreamOwner(stream); + if (null != streamOwner) { + return Future.value(streamOwner); + } + Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream); + return streamLoadFuture.map(new Function<StreamLoad, String>() { + @Override + public String apply(StreamLoad streamLoad) { + return placeStreamSynchronized(streamLoad); + } + }); + } + + private synchronized String placeStreamSynchronized(StreamLoad streamLoad) { + ServerLoad serverLoad = serverLoads.pollFirst(); + serverLoad.addStream(streamLoad); + serverLoads.add(serverLoad); + return serverLoad.getServer(); + } + + @Override + public void refresh() { + logger.info("Refreshing server loads."); + Future<Void> refresh = loadAppraiser.refreshCache(); + final Set<String> servers = getServers(); + final Set<String> allStreams = getStreams(); + Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap( + new Function<Void, Future<TreeSet<ServerLoad>>>() { + @Override + public Future<TreeSet<ServerLoad>> apply(Void v1) { + return calculate(servers, allStreams); + } + }); + serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() { + @Override + public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) { + try { + updateServerLoads(serverLoads); + } catch (PlacementStateManager.StateManagerSaveException e) { + logger.error("The refreshed mapping could not be persisted and will not be used.", e); + } + return BoxedUnit.UNIT; + } + }); + } + + private synchronized void updateServerLoads(TreeSet<ServerLoad> serverLoads) + throws PlacementStateManager.StateManagerSaveException { + this.placementStateManager.saveOwnership(serverLoads); + this.streamToServer = serverLoadsToMap(serverLoads); + this.serverLoads = serverLoads; + } + + @Override + public synchronized void load(TreeSet<ServerLoad> serverLoads) { + this.serverLoads = serverLoads; + this.streamToServer = serverLoadsToMap(serverLoads); + } + + public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) { + logger.info("Calculating server loads"); + final long startTime = System.currentTimeMillis(); + ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size()); + + for (String stream : streams) { + Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream); + futures.add(streamLoad); + } + + return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() { + @Override + public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) { + /* Sort streamLoads so largest streams are placed first for better balance */ + TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>(); + for (StreamLoad streamLoad : streamLoads) { + streamQueue.add(streamLoad); + } + + TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>(); + for (String server : servers) { + ServerLoad serverLoad = new ServerLoad(server); + if (!streamQueue.isEmpty()) { + serverLoad.addStream(streamQueue.pollFirst()); + } + serverLoads.add(serverLoad); + } + + while (!streamQueue.isEmpty()) { + ServerLoad serverLoad = serverLoads.pollFirst(); + serverLoad.addStream(streamQueue.pollFirst()); + serverLoads.add(serverLoad); + } + return serverLoads; + } + }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() { + @Override + public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) { + placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime); + return BoxedUnit.UNIT; + } + }).onFailure(new Function<Throwable, BoxedUnit>() { + @Override + public BoxedUnit apply(Throwable t) { + logger.error("Failure calculating loads", t); + placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime); + return BoxedUnit.UNIT; + } + }); + } + + private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) { + HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size()); + for (ServerLoad serverLoad : serverLoads) { + for (StreamLoad streamLoad : serverLoad.getStreamLoads()) { + streamToServer.put(streamLoad.getStream(), serverLoad.getServer()); + } + } + return streamToServer; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java new file mode 100644 index 0000000..5cd8980 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import com.twitter.util.Future; + +/** + * Interface for load appraiser. + */ +public interface LoadAppraiser { + /** + * Retrieve the stream load for a given {@code stream}. + * + * @param stream name of the stream + * @return the stream load of the stream. + */ + Future<StreamLoad> getStreamLoad(String stream); + + /** + * Refesch the cache. + * @return + */ + Future<Void> refreshCache(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java new file mode 100644 index 0000000..ac952aa --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import org.apache.distributedlog.client.routing.RoutingService; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.service.DLSocketAddress; +import com.twitter.util.Duration; +import com.twitter.util.Function0; +import com.twitter.util.Future; +import com.twitter.util.ScheduledThreadPoolTimer; +import com.twitter.util.Time; +import com.twitter.util.Timer; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.TreeSet; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +/** + * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream contains. + * + * <p>The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will + * then distributed these StreamLoads to the available servers in a manner defined by the + * implementation creating ServerLoad objects. It then saves this assignment via the + * PlacementStateManager. + */ +public abstract class PlacementPolicy { + + private static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class); + + protected final LoadAppraiser loadAppraiser; + protected final RoutingService routingService; + protected final DistributedLogNamespace namespace; + protected final PlacementStateManager placementStateManager; + private final Duration refreshInterval; + protected final OpStatsLogger placementCalcStats; + private Timer placementRefreshTimer; + + public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, + DistributedLogNamespace namespace, PlacementStateManager placementStateManager, + Duration refreshInterval, StatsLogger statsLogger) { + this.loadAppraiser = loadAppraiser; + this.routingService = routingService; + this.namespace = namespace; + this.placementStateManager = placementStateManager; + this.refreshInterval = refreshInterval; + placementCalcStats = statsLogger.getOpStatsLogger("placement"); + } + + public Set<String> getServers() { + Set<SocketAddress> hosts = routingService.getHosts(); + Set<String> servers = new HashSet<String>(hosts.size()); + for (SocketAddress address : hosts) { + servers.add(DLSocketAddress.toString((InetSocketAddress) address)); + } + return servers; + } + + public Set<String> getStreams() { + Set<String> streams = new HashSet<String>(); + try { + Iterator<String> logs = namespace.getLogs(); + while (logs.hasNext()) { + streams.add(logs.next()); + } + } catch (IOException e) { + logger.error("Could not get streams for placement policy.", e); + } + return streams; + } + + public void start(boolean leader) { + logger.info("Starting placement policy"); + + TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>(); + for (String server : getServers()) { + emptyServerLoads.add(new ServerLoad(server)); + } + load(emptyServerLoads); //Pre-Load so streams don't NPE + if (leader) { //this is the leader shard + logger.info("Shard is leader. Scheduling timed refresh."); + placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true); + placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + refresh(); + return BoxedUnit.UNIT; + } + }); + } else { + logger.info("Shard is not leader. Watching for server load changes."); + placementStateManager.watch(new PlacementStateManager.PlacementCallback() { + @Override + public void callback(TreeSet<ServerLoad> serverLoads) { + if (!serverLoads.isEmpty()) { + load(serverLoads); + } + } + }); + } + } + + public void close() { + if (placementRefreshTimer != null) { + placementRefreshTimer.stop(); + } + } + + /** + * Places the stream on a server according to the policy. + * + * <p>It returns a future containing the host that owns the stream upon completion + */ + public abstract Future<String> placeStream(String stream); + + /** + * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager. + */ + public abstract void refresh(); + + /** + * Loads the placement mapping into the node from a TreeSet of ServerLoads. + */ + public abstract void load(TreeSet<ServerLoad> serverLoads); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java new file mode 100644 index 0000000..0187bed --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import java.util.TreeSet; + +/** + * The PlacementStateManager handles persistence of calculated resource placements. + */ +public interface PlacementStateManager { + + /** + * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage. + */ + void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException; + + /** + * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage. + */ + TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException; + + /** + * Watch the persistent storage for changes to the ownership mapping. + * + * <p>The placementCallback callbacks will be triggered with the new mapping when a change occurs. + */ + void watch(PlacementCallback placementCallback); + + /** + * Placement Callback. + * + * <p>The callback is triggered when server loads are updated. + */ + interface PlacementCallback { + void callback(TreeSet<ServerLoad> serverLoads); + } + + /** + * The base exception thrown when state manager encounters errors. + */ + abstract class StateManagerException extends Exception { + public StateManagerException(String message, Exception e) { + super(message, e); + } + } + + /** + * Exception thrown when failed to load the ownership mapping. + */ + class StateManagerLoadException extends StateManagerException { + public StateManagerLoadException(Exception e) { + super("Load of Ownership failed", e); + } + } + + /** + * Exception thrown when failed to save the ownership mapping. + */ + class StateManagerSaveException extends StateManagerException { + public StateManagerSaveException(Exception e) { + super("Save of Ownership failed", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java new file mode 100644 index 0000000..d65c401 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import static com.google.common.base.Charsets.UTF_8; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TJSONProtocol; +import org.apache.thrift.transport.TMemoryBuffer; +import org.apache.thrift.transport.TMemoryInputTransport; + +/** + * An object represents the server load. + * + * <p>A comparable data object containing the identifier of the server, total appraised load on the + * server, and all streams assigned to the server by the resource placement mapping. This is + * comparable first by load and then by server so that a sorted data structure of these will be + * consistent across multiple calculations. + */ +public class ServerLoad implements Comparable { + private static final int BUFFER_SIZE = 4096000; + private final String server; + private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>(); + private long load = 0L; + + public ServerLoad(String server) { + this.server = server; + } + + public synchronized long addStream(StreamLoad stream) { + this.load += stream.getLoad(); + streamLoads.add(stream); + return this.load; + } + + public synchronized long removeStream(String stream) { + for (StreamLoad streamLoad : streamLoads) { + if (streamLoad.stream.equals(stream)) { + this.load -= streamLoad.getLoad(); + streamLoads.remove(streamLoad); + return this.load; + } + } + return this.load; //Throwing an exception wouldn't help us as our logic should never reach here + } + + public synchronized long getLoad() { + return load; + } + + public synchronized Set<StreamLoad> getStreamLoads() { + return streamLoads; + } + + public synchronized String getServer() { + return server; + } + + protected synchronized org.apache.distributedlog.service.placement.thrift.ServerLoad toThrift() { + org.apache.distributedlog.service.placement.thrift.ServerLoad tServerLoad = + new org.apache.distributedlog.service.placement.thrift.ServerLoad(); + tServerLoad.setServer(server); + tServerLoad.setLoad(load); + ArrayList<org.apache.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads = + new ArrayList<org.apache.distributedlog.service.placement.thrift.StreamLoad>(); + for (StreamLoad streamLoad : streamLoads) { + tStreamLoads.add(streamLoad.toThrift()); + } + tServerLoad.setStreams(tStreamLoads); + return tServerLoad; + } + + public byte[] serialize() throws IOException { + TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + toThrift().write(protocol); + transport.flush(); + return transport.toString(UTF_8.name()).getBytes(UTF_8); + } catch (TException e) { + throw new IOException("Failed to serialize server load : ", e); + } catch (UnsupportedEncodingException uee) { + throw new IOException("Failed to serialize server load : ", uee); + } + } + + public static ServerLoad deserialize(byte[] data) throws IOException { + org.apache.distributedlog.service.placement.thrift.ServerLoad tServerLoad = + new org.apache.distributedlog.service.placement.thrift.ServerLoad(); + TMemoryInputTransport transport = new TMemoryInputTransport(data); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + tServerLoad.read(protocol); + ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer()); + if (tServerLoad.isSetStreams()) { + for (org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : + tServerLoad.getStreams()) { + serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad())); + } + } + return serverLoad; + } catch (TException e) { + throw new IOException("Failed to deserialize server load : ", e); + } + } + + @Override + public synchronized int compareTo(Object o) { + ServerLoad other = (ServerLoad) o; + if (load == other.getLoad()) { + return server.compareTo(other.getServer()); + } else { + return Long.compare(load, other.getLoad()); + } + } + + @Override + public synchronized boolean equals(Object o) { + if (!(o instanceof ServerLoad)) { + return false; + } + ServerLoad other = (ServerLoad) o; + return server.equals(other.getServer()) + && load == other.getLoad() + && streamLoads.equals(other.getStreamLoads()); + } + + @Override + public synchronized String toString() { + return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads); + } + + @Override + public synchronized int hashCode() { + return new HashCodeBuilder().append(server).append(load).append(streamLoads).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java new file mode 100644 index 0000000..f271222 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import static com.google.common.base.Charsets.UTF_8; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TJSONProtocol; +import org.apache.thrift.transport.TMemoryBuffer; +import org.apache.thrift.transport.TMemoryInputTransport; + +/** + * An object represent the load of a stream. + * + * <p>A comparable data object containing the identifier of the stream and the appraised load produced + * by the stream. + */ +public class StreamLoad implements Comparable { + private static final int BUFFER_SIZE = 4096; + public final String stream; + private final int load; + + public StreamLoad(String stream, int load) { + this.stream = stream; + this.load = load; + } + + public int getLoad() { + return load; + } + + public String getStream() { + return stream; + } + + protected org.apache.distributedlog.service.placement.thrift.StreamLoad toThrift() { + org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = + new org.apache.distributedlog.service.placement.thrift.StreamLoad(); + return tStreamLoad.setStream(stream).setLoad(load); + } + + public byte[] serialize() throws IOException { + TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + toThrift().write(protocol); + transport.flush(); + return transport.toString(UTF_8.name()).getBytes(UTF_8); + } catch (TException e) { + throw new IOException("Failed to serialize stream load : ", e); + } catch (UnsupportedEncodingException uee) { + throw new IOException("Failed to serialize stream load : ", uee); + } + } + + public static StreamLoad deserialize(byte[] data) throws IOException { + org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = + new org.apache.distributedlog.service.placement.thrift.StreamLoad(); + TMemoryInputTransport transport = new TMemoryInputTransport(data); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + tStreamLoad.read(protocol); + return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()); + } catch (TException e) { + throw new IOException("Failed to deserialize stream load : ", e); + } + } + + @Override + public int compareTo(Object o) { + StreamLoad other = (StreamLoad) o; + if (load == other.getLoad()) { + return stream.compareTo(other.getStream()); + } else { + return Long.compare(load, other.getLoad()); + } + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof StreamLoad)) { + return false; + } + StreamLoad other = (StreamLoad) o; + return stream.equals(other.getStream()) && load == other.getLoad(); + } + + @Override + public String toString() { + return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(stream).append(load).build(); + } +}