http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
new file mode 100644
index 0000000..3c53ccf
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.service.DistributedLogClient;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A balancer balances ownerships between two targets.
+ */
+public class SimpleBalancer implements Balancer {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SimpleBalancer.class);
+
+    protected final String target1;
+    protected final String target2;
+    protected final DistributedLogClient targetClient1;
+    protected final DistributedLogClient targetClient2;
+    protected final MonitorServiceClient targetMonitor1;
+    protected final MonitorServiceClient targetMonitor2;
+
+    public SimpleBalancer(String name1,
+                          DistributedLogClient client1,
+                          MonitorServiceClient monitor1,
+                          String name2,
+                          DistributedLogClient client2,
+                          MonitorServiceClient monitor2) {
+        this.target1 = name1;
+        this.targetClient1 = client1;
+        this.targetMonitor1 = monitor1;
+        this.target2 = name2;
+        this.targetClient2 = client2;
+        this.targetMonitor2 = monitor2;
+    }
+
+    protected static int countNumberStreams(Map<SocketAddress, Set<String>> 
distribution) {
+        int count = 0;
+        for (Set<String> streams : distribution.values()) {
+            count += streams.size();
+        }
+        return count;
+    }
+
+    @Override
+    public void balance(int rebalanceWaterMark,
+                        double rebalanceTolerancePercentage,
+                        int rebalanceConcurrency,
+                        Optional<RateLimiter> rebalanceRateLimiter) {
+        // get the ownership distributions from individual targets
+        Map<SocketAddress, Set<String>> distribution1 = 
targetMonitor1.getStreamOwnershipDistribution();
+        Map<SocketAddress, Set<String>> distribution2 = 
targetMonitor2.getStreamOwnershipDistribution();
+
+        // get stream counts
+        int proxyCount1 = distribution1.size();
+        int streamCount1 = countNumberStreams(distribution1);
+        int proxyCount2 = distribution2.size();
+        int streamCount2 = countNumberStreams(distribution2);
+
+        logger.info("'{}' has {} streams by {} proxies; while '{}' has {} 
streams by {} proxies.",
+                    new Object[] {target1, streamCount1, proxyCount1, target2, 
streamCount2, proxyCount2 });
+
+        String source, target;
+        Map<SocketAddress, Set<String>> srcDistribution;
+        DistributedLogClient srcClient, targetClient;
+        MonitorServiceClient srcMonitor, targetMonitor;
+        int srcStreamCount, targetStreamCount;
+        if (streamCount1 > streamCount2) {
+            source = target1;
+            srcStreamCount = streamCount1;
+            srcClient = targetClient1;
+            srcMonitor = targetMonitor1;
+            srcDistribution = distribution1;
+
+            target = target2;
+            targetStreamCount = streamCount2;
+            targetClient = targetClient2;
+            targetMonitor = targetMonitor2;
+        } else {
+            source = target2;
+            srcStreamCount = streamCount2;
+            srcClient = targetClient2;
+            srcMonitor = targetMonitor2;
+            srcDistribution = distribution2;
+
+            target = target1;
+            targetStreamCount = streamCount1;
+            targetClient = targetClient1;
+            targetMonitor = targetMonitor1;
+        }
+
+        Map<String, Integer> loadDistribution = new HashMap<String, Integer>();
+        loadDistribution.put(source, srcStreamCount);
+        loadDistribution.put(target, targetStreamCount);
+
+        // Calculate how many streams to be rebalanced from src region to 
target region
+        int numStreamsToRebalance = 
BalancerUtils.calculateNumStreamsToRebalance(
+            source, loadDistribution, rebalanceWaterMark, 
rebalanceTolerancePercentage);
+
+        if (numStreamsToRebalance <= 0) {
+            logger.info("No streams need to be rebalanced from '{}' to '{}'.", 
source, target);
+            return;
+        }
+
+        StreamChooser streamChooser =
+                LimitedStreamChooser.of(new 
CountBasedStreamChooser(srcDistribution), numStreamsToRebalance);
+        StreamMover streamMover =
+            new StreamMoverImpl(source, srcClient, srcMonitor, target, 
targetClient, targetMonitor);
+
+        moveStreams(streamChooser, streamMover, rebalanceConcurrency, 
rebalanceRateLimiter);
+    }
+
+    @Override
+    public void balanceAll(String source,
+                           int rebalanceConcurrency,
+                           Optional<RateLimiter> rebalanceRateLimiter) {
+        String target;
+        DistributedLogClient sourceClient, targetClient;
+        MonitorServiceClient sourceMonitor, targetMonitor;
+        if (target1.equals(source)) {
+            sourceClient = targetClient1;
+            sourceMonitor = targetMonitor1;
+            target = target2;
+            targetClient = targetClient2;
+            targetMonitor = targetMonitor2;
+        } else if (target2.equals(source)) {
+            sourceClient = targetClient2;
+            sourceMonitor = targetMonitor2;
+            target = target1;
+            targetClient = targetClient1;
+            targetMonitor = targetMonitor1;
+        } else {
+            throw new IllegalArgumentException("Unknown target " + source);
+        }
+
+        // get the ownership distributions from individual targets
+        Map<SocketAddress, Set<String>> distribution = 
sourceMonitor.getStreamOwnershipDistribution();
+
+        if (distribution.isEmpty()) {
+            return;
+        }
+
+        StreamChooser streamChooser = new 
CountBasedStreamChooser(distribution);
+        StreamMover streamMover =
+            new StreamMoverImpl(source, sourceClient, sourceMonitor, target, 
targetClient, targetMonitor);
+
+        moveStreams(streamChooser, streamMover, rebalanceConcurrency, 
rebalanceRateLimiter);
+    }
+
+    private void moveStreams(StreamChooser streamChooser,
+                             StreamMover streamMover,
+                             int concurrency,
+                             Optional<RateLimiter> rateLimiter) {
+        CountDownLatch doneLatch = new CountDownLatch(concurrency);
+        RegionMover regionMover = new RegionMover(streamChooser, streamMover, 
rateLimiter, doneLatch);
+        ExecutorService executorService = 
Executors.newFixedThreadPool(concurrency);
+        try {
+            for (int i = 0; i < concurrency; i++) {
+                executorService.submit(regionMover);
+            }
+
+            try {
+                doneLatch.await();
+            } catch (InterruptedException e) {
+                logger.info("{} is interrupted. Stopping it ...", streamMover);
+                regionMover.shutdown();
+            }
+        } finally {
+            executorService.shutdown();
+        }
+
+    }
+
+    /**
+     * Move streams from <i>src</i> region to <i>target</i> region.
+     */
+    static class RegionMover implements Runnable {
+
+        final StreamChooser streamChooser;
+        final StreamMover streamMover;
+        final Optional<RateLimiter> rateLimiter;
+        final CountDownLatch doneLatch;
+        volatile boolean running = true;
+
+        RegionMover(StreamChooser streamChooser,
+                    StreamMover streamMover,
+                    Optional<RateLimiter> rateLimiter,
+                    CountDownLatch doneLatch) {
+            this.streamChooser = streamChooser;
+            this.streamMover = streamMover;
+            this.rateLimiter = rateLimiter;
+            this.doneLatch = doneLatch;
+        }
+
+        @Override
+        public void run() {
+            while (running) {
+                if (rateLimiter.isPresent()) {
+                    rateLimiter.get().acquire();
+                }
+
+                String stream = streamChooser.choose();
+                if (null == stream) {
+                    break;
+                }
+
+                streamMover.moveStream(stream);
+            }
+            doneLatch.countDown();
+        }
+
+        void shutdown() {
+            running = false;
+        }
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java
new file mode 100644
index 0000000..1d7b6f7
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+/**
+ * Choose a stream to rebalance.
+ */
+public interface StreamChooser {
+    /**
+     * Choose a stream to rebalance.
+     *
+     * @return stream chose
+     */
+    String choose();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java
new file mode 100644
index 0000000..4a04530
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+/**
+ * A stream mover to move streams between proxies.
+ */
+public interface StreamMover {
+
+    /**
+     * Move given stream <i>streamName</i>.
+     *
+     * @param streamName
+     *          stream name to move
+     * @return <i>true</i> if successfully moved the stream, <i>false</i> when 
failure happens.
+     * @throws Exception
+     */
+    boolean moveStream(final String streamName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java
new file mode 100644
index 0000000..68d934b
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.service.DistributedLogClient;
+import com.twitter.util.Await;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Move Streams from <i>src</i> to <i>target</i>.
+ */
+public class StreamMoverImpl implements StreamMover {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamMoverImpl.class);
+
+    final String source, target;
+    final DistributedLogClient srcClient, targetClient;
+    final MonitorServiceClient srcMonitor, targetMonitor;
+
+    public StreamMoverImpl(String source, DistributedLogClient srcClient, 
MonitorServiceClient srcMonitor,
+                           String target, DistributedLogClient targetClient, 
MonitorServiceClient targetMonitor) {
+        this.source = source;
+        this.srcClient = srcClient;
+        this.srcMonitor = srcMonitor;
+        this.target = target;
+        this.targetClient = targetClient;
+        this.targetMonitor = targetMonitor;
+    }
+
+    /**
+     * Move given stream <i>streamName</i>.
+     *
+     * @param streamName
+     *          stream name to move
+     * @return <i>true</i> if successfully moved the stream, <i>false</i> when 
failure happens.
+     * @throws Exception
+     */
+    public boolean moveStream(final String streamName) {
+        try {
+            doMoveStream(streamName);
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    private void doMoveStream(final String streamName) throws Exception {
+        Await.result(srcClient.release(streamName).flatMap(new Function<Void, 
Future<Void>>() {
+            @Override
+            public Future<Void> apply(Void result) {
+                return targetMonitor.check(streamName).addEventListener(new 
FutureEventListener<Void>() {
+                    @Override
+                    public void onSuccess(Void value) {
+                        logger.info("Moved stream {} from {} to {}.",
+                                new Object[]{streamName, source, target});
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        logger.info("Failed to move stream {} from region {} 
to {} : ",
+                                new Object[]{streamName, source, target, 
cause});
+                    }
+                });
+            }
+        }));
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("StreamMover('").append(source).append("' -> 
'").append(target).append("')");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/package-info.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/package-info.java
new file mode 100644
index 0000000..9eb8950
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Balancer to move streams around to balance the traffic.
+ */
+package org.apache.distributedlog.service.balancer;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
new file mode 100644
index 0000000..7d72093
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.config;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.config.ConfigurationSubscription;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.config.FileConfigurationBuilder;
+import org.apache.distributedlog.config.PropertiesConfigurationBuilder;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.configuration.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * For all streams return the same dynamic config based on configFile.
+ */
+public class DefaultStreamConfigProvider implements StreamConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStreamConfigProvider.class);
+
+    private final Optional<DynamicDistributedLogConfiguration> dynConf;
+    private final ConfigurationSubscription confSub;
+
+    public DefaultStreamConfigProvider(String configFilePath,
+                                       ScheduledExecutorService 
executorService,
+                                       int reloadPeriod,
+                                       TimeUnit reloadUnit)
+        throws ConfigurationException {
+        try {
+            File configFile = new File(configFilePath);
+            FileConfigurationBuilder properties =
+                new PropertiesConfigurationBuilder(configFile.toURI().toURL());
+            ConcurrentConstConfiguration defaultConf =
+                new ConcurrentConstConfiguration(new 
DistributedLogConfiguration());
+            DynamicDistributedLogConfiguration conf =
+                new DynamicDistributedLogConfiguration(defaultConf);
+            List<FileConfigurationBuilder> fileConfigBuilders = 
Lists.newArrayList(properties);
+            confSub = new ConfigurationSubscription(
+                conf, fileConfigBuilders, executorService, reloadPeriod, 
reloadUnit);
+            this.dynConf = Optional.of(conf);
+        } catch (MalformedURLException ex) {
+            throw new ConfigurationException(ex);
+        }
+    }
+
+    @Override
+    public Optional<DynamicDistributedLogConfiguration> 
getDynamicStreamConfig(String streamName) {
+        return dynConf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java
new file mode 100644
index 0000000..195f29d
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.config;
+
+import com.google.common.base.Optional;
+
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * For all streams return an absent configuration.
+ */
+public class NullStreamConfigProvider implements StreamConfigProvider {
+    static final Logger LOG = 
LoggerFactory.getLogger(NullStreamConfigProvider.class);
+
+    private static final Optional<DynamicDistributedLogConfiguration> nullConf 
=
+            Optional.<DynamicDistributedLogConfiguration>absent();
+
+    @Override
+    public Optional<DynamicDistributedLogConfiguration> 
getDynamicStreamConfig(String streamName) {
+        return nullConf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
new file mode 100644
index 0000000..257b4be
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.config;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import 
org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.SystemConfiguration;
+
+/**
+ * Configuration for DistributedLog Server.
+ */
+public class ServerConfiguration extends CompositeConfiguration {
+
+    private static ClassLoader defaultLoader;
+
+    static {
+        defaultLoader = Thread.currentThread().getContextClassLoader();
+        if (null == defaultLoader) {
+            defaultLoader = DistributedLogConfiguration.class.getClassLoader();
+        }
+    }
+
+    // Server DLSN version
+    protected static final String SERVER_DLSN_VERSION = "server_dlsn_version";
+    protected static final byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1;
+
+    // Server Durable Write Enable/Disable Flag
+    protected static final String SERVER_DURABLE_WRITE_ENABLED = 
"server_durable_write_enabled";
+    protected static final boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true;
+
+    // Server Region Id
+    protected static final String SERVER_REGION_ID = "server_region_id";
+    protected static final int SERVER_REGION_ID_DEFAULT = 
DistributedLogConstants.LOCAL_REGION_ID;
+
+    // Server Port
+    protected static final String SERVER_PORT = "server_port";
+    protected static final int SERVER_PORT_DEFAULT = 0;
+
+    // Server Shard Id
+    protected static final String SERVER_SHARD_ID = "server_shard";
+    protected static final int SERVER_SHARD_ID_DEFAULT = -1;
+
+    // Server Threads
+    protected static final String SERVER_NUM_THREADS = "server_threads";
+    protected static final int SERVER_NUM_THREADS_DEFAULT = 
Runtime.getRuntime().availableProcessors();
+
+    // Server enable per stream stat
+    protected static final String SERVER_ENABLE_PERSTREAM_STAT = 
"server_enable_perstream_stat";
+    protected static final boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true;
+
+    // Server graceful shutdown period (in millis)
+    protected static final String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = 
"server_graceful_shutdown_period_ms";
+    protected static final long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 
0L;
+
+    // Server service timeout
+    public static final String SERVER_SERVICE_TIMEOUT_MS = 
"server_service_timeout_ms";
+    public static final String SERVER_SERVICE_TIMEOUT_MS_OLD = 
"serviceTimeoutMs";
+    public static final long SERVER_SERVICE_TIMEOUT_MS_DEFAULT = 0;
+
+    // Server close writer timeout
+    public static final String SERVER_WRITER_CLOSE_TIMEOUT_MS = 
"server_writer_close_timeout_ms";
+    public static final long SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT = 1000;
+
+    // Server stream probation timeout
+    public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = 
"server_stream_probation_timeout_ms";
+    public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = 
"streamProbationTimeoutMs";
+    public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60 * 
1000 * 5;
+
+    // Server stream to partition converter
+    protected static final String SERVER_STREAM_PARTITION_CONVERTER_CLASS = 
"stream_partition_converter_class";
+
+    // Use hostname as the allocator pool name
+    protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME =
+        "server_use_hostname_as_allocator_pool_name";
+    protected static final boolean 
SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false;
+    //Configure refresh interval for calculating resource placement in seconds
+    public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S =
+        "server_resource_placement_refresh_interval_sec";
+    public static final int  
SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120;
+
+    public ServerConfiguration() {
+        super();
+        addConfiguration(new SystemConfiguration());
+    }
+
+    /**
+     * Load configurations from {@link DistributedLogConfiguration}.
+     *
+     * @param dlConf
+     *          distributedlog configuration
+     */
+    public void loadConf(DistributedLogConfiguration dlConf) {
+        addConfiguration(dlConf);
+    }
+
+    /**
+     * Set the version to encode dlsn.
+     *
+     * @param version
+     *          dlsn version
+     * @return server configuration
+     */
+    public ServerConfiguration setDlsnVersion(byte version) {
+        setProperty(SERVER_DLSN_VERSION, version);
+        return this;
+    }
+
+    /**
+     * Get the version to encode dlsn.
+     *
+     * @see DLSN
+     * @return version to encode dlsn.
+     */
+    public byte getDlsnVersion() {
+        return getByte(SERVER_DLSN_VERSION, SERVER_DLSN_VERSION_DEFAULT);
+    }
+
+    /**
+     * Set the flag to enable/disable durable write.
+     *
+     * @param enabled
+     *          flag to enable/disable durable write
+     * @return server configuration
+     */
+    public ServerConfiguration enableDurableWrite(boolean enabled) {
+        setProperty(SERVER_DURABLE_WRITE_ENABLED, enabled);
+        return this;
+    }
+
+    /**
+     * Is durable write enabled.
+     *
+     * @return true if waiting writes to be durable. otherwise false.
+     */
+    public boolean isDurableWriteEnabled() {
+        return getBoolean(SERVER_DURABLE_WRITE_ENABLED, 
SERVER_DURABLE_WRITE_ENABLED_DEFAULT);
+    }
+
+    /**
+     * Set the region id used to instantiate DistributedLogNamespace.
+     *
+     * @param regionId
+     *          region id
+     * @return server configuration
+     */
+    public ServerConfiguration setRegionId(int regionId) {
+        setProperty(SERVER_REGION_ID, regionId);
+        return this;
+    }
+
+    /**
+     * Get the region id used to instantiate {@link 
org.apache.distributedlog.namespace.DistributedLogNamespace}.
+     *
+     * @return region id used to instantiate DistributedLogNamespace
+     */
+    public int getRegionId() {
+        return getInt(SERVER_REGION_ID, SERVER_REGION_ID_DEFAULT);
+    }
+
+    /**
+     * Set the server port running for this service.
+     *
+     * @param port
+     *          server port
+     * @return server configuration
+     */
+    public ServerConfiguration setServerPort(int port) {
+        setProperty(SERVER_PORT, port);
+        return this;
+    }
+
+    /**
+     * Get the server port running for this service.
+     *
+     * @return server port
+     */
+    public int getServerPort() {
+        return getInt(SERVER_PORT, SERVER_PORT_DEFAULT);
+    }
+
+    /**
+     * Set the shard id of this server.
+     *
+     * @param shardId
+     *          shard id
+     * @return shard id of this server
+     */
+    public ServerConfiguration setServerShardId(int shardId) {
+        setProperty(SERVER_SHARD_ID, shardId);
+        return this;
+    }
+
+    /**
+     * Get the shard id of this server.
+     *
+     * <p>It would be used to instantiate the client id used for 
DistributedLogNamespace.
+     *
+     * @return shard id of this server.
+     */
+    public int getServerShardId() {
+        return getInt(SERVER_SHARD_ID, SERVER_SHARD_ID_DEFAULT);
+    }
+
+    /**
+     * Get the number of threads for the executor of this server.
+     *
+     * @return number of threads for the executor running in this server.
+     */
+    public int getServerThreads() {
+        return getInt(SERVER_NUM_THREADS, SERVER_NUM_THREADS_DEFAULT);
+    }
+
+    /**
+     * Set the number of threads for the executor of this server.
+     *
+     * @param numThreads
+     *          number of threads for the executor running in this server.
+     * @return server configuration
+     */
+    public ServerConfiguration setServerThreads(int numThreads) {
+        setProperty(SERVER_NUM_THREADS, numThreads);
+        return this;
+    }
+
+    /**
+     * Enable/Disable per stream stat.
+     *
+     * @param enabled
+     *          flag to enable/disable per stream stat
+     * @return server configuration
+     */
+    public ServerConfiguration setPerStreamStatEnabled(boolean enabled) {
+        setProperty(SERVER_ENABLE_PERSTREAM_STAT, enabled);
+        return this;
+    }
+
+    /**
+     * Whether the per stream stat enabled for not in this server.
+     *
+     * @return true if per stream stat enable, otherwise false.
+     */
+    public boolean isPerStreamStatEnabled() {
+        return getBoolean(SERVER_ENABLE_PERSTREAM_STAT, 
SERVER_ENABLE_PERSTREAM_STAT_DEFAULT);
+    }
+
+    /**
+     * Set the graceful shutdown period in millis.
+     *
+     * @param periodMs
+     *          graceful shutdown period in millis.
+     * @return server configuration
+     */
+    public ServerConfiguration setGracefulShutdownPeriodMs(long periodMs) {
+        setProperty(SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS, periodMs);
+        return this;
+    }
+
+    /**
+     * Get the graceful shutdown period in millis.
+     *
+     * @return graceful shutdown period in millis.
+     */
+    public long getGracefulShutdownPeriodMs() {
+        return getLong(SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS, 
SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT);
+    }
+
+    /**
+     * Get timeout for stream op execution in proxy layer.
+     *
+     * <p>0 disables timeout.
+     *
+     * @return timeout for stream operation in proxy layer.
+     */
+    public long getServiceTimeoutMs() {
+        return getLong(SERVER_SERVICE_TIMEOUT_MS,
+                getLong(SERVER_SERVICE_TIMEOUT_MS_OLD, 
SERVER_SERVICE_TIMEOUT_MS_DEFAULT));
+    }
+
+    /**
+     * Set timeout for stream op execution in proxy layer.
+     *
+     * <p>0 disables timeout.
+     *
+     * @param timeoutMs
+     *          timeout for stream operation in proxy layer.
+     * @return dl configuration.
+     */
+    public ServerConfiguration setServiceTimeoutMs(long timeoutMs) {
+        setProperty(SERVER_SERVICE_TIMEOUT_MS, timeoutMs);
+        return this;
+    }
+
+    /**
+     * Get timeout for closing writer in proxy layer.
+     *
+     * <p>0 disables timeout.
+     *
+     * @return timeout for closing writer in proxy layer.
+     */
+    public long getWriterCloseTimeoutMs() {
+        return getLong(SERVER_WRITER_CLOSE_TIMEOUT_MS, 
SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT);
+    }
+
+    /**
+     * Set timeout for closing writer in proxy layer.
+     *
+     * <p>0 disables timeout.
+     *
+     * @param timeoutMs
+     *          timeout for closing writer in proxy layer.
+     * @return dl configuration.
+     */
+    public ServerConfiguration setWriterCloseTimeoutMs(long timeoutMs) {
+        setProperty(SERVER_WRITER_CLOSE_TIMEOUT_MS, timeoutMs);
+        return this;
+    }
+
+    /**
+     * How long should stream be kept in cache in probationary state after 
service timeout.
+     *
+     * <p>The setting is to prevent reacquire. The unit of this setting is 
milliseconds.
+     *
+     * @return stream probation timeout in ms.
+     */
+    public long getStreamProbationTimeoutMs() {
+        return getLong(SERVER_STREAM_PROBATION_TIMEOUT_MS,
+                getLong(SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD, 
SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT));
+    }
+
+    /**
+     * How long should stream be kept in cache in probationary state after 
service timeout.
+     *
+     * <p>The setting is to prevent reacquire. The unit of this setting is 
milliseconds.
+     *
+     * @param timeoutMs probation timeout in ms.
+     * @return server configuration
+     */
+    public ServerConfiguration setStreamProbationTimeoutMs(long timeoutMs) {
+        setProperty(SERVER_STREAM_PROBATION_TIMEOUT_MS, timeoutMs);
+        return this;
+    }
+
+    /**
+     * Set the stream partition converter class.
+     *
+     * @param converterClass
+     *          stream partition converter class
+     * @return server configuration
+     */
+    public ServerConfiguration setStreamPartitionConverterClass(
+        Class<? extends StreamPartitionConverter> converterClass) {
+        setProperty(SERVER_STREAM_PARTITION_CONVERTER_CLASS, 
converterClass.getName());
+        return this;
+    }
+
+    /**
+     * Get the stream partition converter class.
+     *
+     * @return the stream partition converter class.
+     * @throws ConfigurationException
+     */
+    public Class<? extends StreamPartitionConverter> 
getStreamPartitionConverterClass()
+            throws ConfigurationException {
+        return ReflectionUtils.getClass(
+                this,
+                SERVER_STREAM_PARTITION_CONVERTER_CLASS,
+                IdentityStreamPartitionConverter.class,
+                StreamPartitionConverter.class,
+                defaultLoader);
+    }
+
+     /**
+      * Set if use hostname as the allocator pool name.
+      *
+      * @param useHostname whether to use hostname as the allocator pool name.
+      * @return server configuration
+      * @see #isUseHostnameAsAllocatorPoolName()
+      */
+    public ServerConfiguration setUseHostnameAsAllocatorPoolName(boolean 
useHostname) {
+        setProperty(SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME, useHostname);
+        return this;
+    }
+
+    /**
+     * Get if use hostname as the allocator pool name.
+     *
+     * @return true if use hostname as the allocator pool name. otherwise, use
+     * {@link #getServerShardId()} as the allocator pool name.
+     * @see #getServerShardId()
+     */
+    public boolean isUseHostnameAsAllocatorPoolName() {
+        return getBoolean(SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME,
+            SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT);
+    }
+
+    public ServerConfiguration setResourcePlacementRefreshInterval(int 
refreshIntervalSecs) {
+        setProperty(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, 
refreshIntervalSecs);
+        return this;
+    }
+
+    public int getResourcePlacementRefreshInterval() {
+        return getInt(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, 
SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT);
+    }
+
+    /**
+     * Validate the configuration.
+     *
+     * @throws IllegalStateException when there are any invalid settings.
+     */
+    public void validate() {
+        byte dlsnVersion = getDlsnVersion();
+        checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= 
DLSN.VERSION1,
+                "Unknown dlsn version " + dlsnVersion);
+        checkArgument(getServerThreads() > 0,
+                "Invalid number of server threads : " + getServerThreads());
+        checkArgument(getServerShardId() >= 0,
+                "Invalid server shard id : " + getServerShardId());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java
new file mode 100644
index 0000000..29052f9
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.config;
+
+import com.google.common.base.Optional;
+import org.apache.distributedlog.config.DynamicConfigurationFactory;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import java.io.File;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.configuration.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide per stream configuration to DistributedLog service layer.
+ */
+public class ServiceStreamConfigProvider implements StreamConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ServiceStreamConfigProvider.class);
+
+    private static final String CONFIG_EXTENSION = "conf";
+
+    private final File configBaseDir;
+    private final File defaultConfigFile;
+    private final StreamPartitionConverter partitionConverter;
+    private final DynamicConfigurationFactory configFactory;
+    private final DynamicDistributedLogConfiguration defaultDynConf;
+
+    public ServiceStreamConfigProvider(String configPath,
+                                       String defaultConfigPath,
+                                       StreamPartitionConverter 
partitionConverter,
+                                       ScheduledExecutorService 
executorService,
+                                       int reloadPeriod,
+                                       TimeUnit reloadUnit)
+                                       throws ConfigurationException {
+        this.configBaseDir = new File(configPath);
+        if (!configBaseDir.exists()) {
+            throw new ConfigurationException("Stream configuration base 
directory "
+                + configPath + " does not exist");
+        }
+        this.defaultConfigFile = new File(configPath);
+        if (!defaultConfigFile.exists()) {
+            throw new ConfigurationException("Stream configuration default 
config "
+                + defaultConfigPath + " does not exist");
+        }
+
+        // Construct reloading default configuration
+        this.partitionConverter = partitionConverter;
+        this.configFactory = new DynamicConfigurationFactory(executorService, 
reloadPeriod, reloadUnit);
+        // We know it exists from the check above.
+        this.defaultDynConf = 
configFactory.getDynamicConfiguration(defaultConfigPath).get();
+    }
+
+    @Override
+    public Optional<DynamicDistributedLogConfiguration> 
getDynamicStreamConfig(String streamName) {
+        String configName = partitionConverter.convert(streamName).getStream();
+        String configPath = getConfigPath(configName);
+        Optional<DynamicDistributedLogConfiguration> dynConf = 
Optional.<DynamicDistributedLogConfiguration>absent();
+        try {
+            dynConf = configFactory.getDynamicConfiguration(configPath, 
defaultDynConf);
+        } catch (ConfigurationException ex) {
+            LOG.warn("Configuration exception for stream {} ({}) at {}",
+                    new Object[] {streamName, configName, configPath, ex});
+        }
+        return dynConf;
+    }
+
+    private String getConfigPath(String configName) {
+        return new File(configBaseDir, String.format("%s.%s", configName, 
CONFIG_EXTENSION)).getPath();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java
new file mode 100644
index 0000000..c704f70
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.config;
+
+import com.google.common.base.Optional;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+
+/**
+ * Expose per-stream configs to dl proxy.
+ */
+public interface StreamConfigProvider {
+    /**
+     * Get dynamic per stream config overrides for a given stream.
+     *
+     * @param streamName stream name to return config for
+     * @return Optional dynamic configuration instance
+     */
+    Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String 
streamName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/package-info.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/package-info.java
new file mode 100644
index 0000000..b07605e
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Server Configurations.
+ */
+package org.apache.distributedlog.service.config;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/package-info.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/package-info.java
new file mode 100644
index 0000000..3fcfeda
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Proxy Service.
+ */
+package org.apache.distributedlog.service;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java
new file mode 100644
index 0000000..fa3dd49
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import com.twitter.util.Future;
+
+/**
+ * Equal Load Appraiser.
+ *
+ * <p>Created for those who hold these truths to be self-evident, that all 
streams are created equal,
+ * that they are endowed by their creator with certain unalienable loads, that 
among these are
+ * Uno, Eins, and One.
+ */
+public class EqualLoadAppraiser implements LoadAppraiser {
+    @Override
+    public Future<StreamLoad> getStreamLoad(String stream) {
+        return Future.value(new StreamLoad(stream, 1));
+    }
+
+    @Override
+    public Future<Void> refreshCache() {
+        return Future.value(null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
new file mode 100644
index 0000000..2e9dd6b
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Least Load Placement Policy.
+ *
+ * <p>A LoadPlacementPolicy that attempts to place streams in such a way that 
the load is balanced as
+ * evenly as possible across all shards. The LoadAppraiser remains responsible 
for determining what
+ * the load of a server would be. This placement policy then distributes these 
streams across the
+ * servers.
+ */
+public class LeastLoadPlacementPolicy extends PlacementPolicy {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(LeastLoadPlacementPolicy.class);
+
+    private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+    private Map<String, String> streamToServer = new HashMap<String, String>();
+
+    public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, 
RoutingService routingService,
+                                    DistributedLogNamespace namespace, 
PlacementStateManager placementStateManager,
+                                    Duration refreshInterval, StatsLogger 
statsLogger) {
+        super(loadAppraiser, routingService, namespace, placementStateManager, 
refreshInterval, statsLogger);
+        statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                if (serverLoads.size() > 0) {
+                    return serverLoads.last().getLoad() - 
serverLoads.first().getLoad();
+                } else {
+                    return getDefaultValue();
+                }
+            }
+        });
+    }
+
+    private synchronized String getStreamOwner(String stream) {
+        return streamToServer.get(stream);
+    }
+
+    @Override
+    public Future<String> placeStream(String stream) {
+        String streamOwner = getStreamOwner(stream);
+        if (null != streamOwner) {
+            return Future.value(streamOwner);
+        }
+        Future<StreamLoad> streamLoadFuture = 
loadAppraiser.getStreamLoad(stream);
+        return streamLoadFuture.map(new Function<StreamLoad, String>() {
+            @Override
+            public String apply(StreamLoad streamLoad) {
+                return placeStreamSynchronized(streamLoad);
+            }
+        });
+    }
+
+    private synchronized String placeStreamSynchronized(StreamLoad streamLoad) 
{
+        ServerLoad serverLoad = serverLoads.pollFirst();
+        serverLoad.addStream(streamLoad);
+        serverLoads.add(serverLoad);
+        return serverLoad.getServer();
+    }
+
+    @Override
+    public void refresh() {
+        logger.info("Refreshing server loads.");
+        Future<Void> refresh = loadAppraiser.refreshCache();
+        final Set<String> servers = getServers();
+        final Set<String> allStreams = getStreams();
+        Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(
+            new Function<Void, Future<TreeSet<ServerLoad>>>() {
+            @Override
+            public Future<TreeSet<ServerLoad>> apply(Void v1) {
+                return calculate(servers, allStreams);
+            }
+        });
+        serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+                try {
+                    updateServerLoads(serverLoads);
+                } catch (PlacementStateManager.StateManagerSaveException e) {
+                    logger.error("The refreshed mapping could not be persisted 
and will not be used.", e);
+                }
+                return BoxedUnit.UNIT;
+            }
+        });
+    }
+
+    private synchronized void updateServerLoads(TreeSet<ServerLoad> 
serverLoads)
+        throws PlacementStateManager.StateManagerSaveException {
+        this.placementStateManager.saveOwnership(serverLoads);
+        this.streamToServer = serverLoadsToMap(serverLoads);
+        this.serverLoads = serverLoads;
+    }
+
+    @Override
+    public synchronized void load(TreeSet<ServerLoad> serverLoads) {
+        this.serverLoads = serverLoads;
+        this.streamToServer = serverLoadsToMap(serverLoads);
+    }
+
+    public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, 
Set<String> streams) {
+        logger.info("Calculating server loads");
+        final long startTime = System.currentTimeMillis();
+        ArrayList<Future<StreamLoad>> futures = new 
ArrayList<Future<StreamLoad>>(streams.size());
+
+        for (String stream : streams) {
+            Future<StreamLoad> streamLoad = 
loadAppraiser.getStreamLoad(stream);
+            futures.add(streamLoad);
+        }
+
+        return Futures.collect(futures).map(new Function<List<StreamLoad>, 
TreeSet<ServerLoad>>() {
+            @Override
+            public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
+        /* Sort streamLoads so largest streams are placed first for better 
balance */
+                TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
+                for (StreamLoad streamLoad : streamLoads) {
+                    streamQueue.add(streamLoad);
+                }
+
+                TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+                for (String server : servers) {
+                    ServerLoad serverLoad = new ServerLoad(server);
+                    if (!streamQueue.isEmpty()) {
+                        serverLoad.addStream(streamQueue.pollFirst());
+                    }
+                    serverLoads.add(serverLoad);
+                }
+
+                while (!streamQueue.isEmpty()) {
+                    ServerLoad serverLoad = serverLoads.pollFirst();
+                    serverLoad.addStream(streamQueue.pollFirst());
+                    serverLoads.add(serverLoad);
+                }
+                return serverLoads;
+            }
+        }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+                
placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - 
startTime);
+                return BoxedUnit.UNIT;
+            }
+        }).onFailure(new Function<Throwable, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Throwable t) {
+                logger.error("Failure calculating loads", t);
+                
placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
+                return BoxedUnit.UNIT;
+            }
+        });
+    }
+
+    private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> 
serverLoads) {
+        HashMap<String, String> streamToServer = new HashMap<String, 
String>(serverLoads.size());
+        for (ServerLoad serverLoad : serverLoads) {
+            for (StreamLoad streamLoad : serverLoad.getStreamLoads()) {
+                streamToServer.put(streamLoad.getStream(), 
serverLoad.getServer());
+            }
+        }
+        return streamToServer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java
new file mode 100644
index 0000000..5cd8980
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import com.twitter.util.Future;
+
+/**
+ * Interface for load appraiser.
+ */
+public interface LoadAppraiser {
+    /**
+     * Retrieve the stream load for a given {@code stream}.
+     *
+     * @param stream name of the stream
+     * @return the stream load of the stream.
+     */
+    Future<StreamLoad> getStreamLoad(String stream);
+
+    /**
+     * Refesch the cache.
+     * @return
+     */
+    Future<Void> refreshCache();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
new file mode 100644
index 0000000..ac952aa
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.service.DLSocketAddress;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Time;
+import com.twitter.util.Timer;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A PlacementPolicy assigns streams to servers given an appraisal of the load 
that the stream contains.
+ *
+ * <p>The load of a stream is determined by the LoadAppraiser used. The 
PlacementPolicy will
+ * then distributed these StreamLoads to the available servers in a manner 
defined by the
+ * implementation creating ServerLoad objects. It then saves this assignment 
via the
+ * PlacementStateManager.
+ */
+public abstract class PlacementPolicy {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(PlacementPolicy.class);
+
+    protected final LoadAppraiser loadAppraiser;
+    protected final RoutingService routingService;
+    protected final DistributedLogNamespace namespace;
+    protected final PlacementStateManager placementStateManager;
+    private final Duration refreshInterval;
+    protected final OpStatsLogger placementCalcStats;
+    private Timer placementRefreshTimer;
+
+    public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService 
routingService,
+                           DistributedLogNamespace namespace, 
PlacementStateManager placementStateManager,
+                           Duration refreshInterval, StatsLogger statsLogger) {
+        this.loadAppraiser = loadAppraiser;
+        this.routingService = routingService;
+        this.namespace = namespace;
+        this.placementStateManager = placementStateManager;
+        this.refreshInterval = refreshInterval;
+        placementCalcStats = statsLogger.getOpStatsLogger("placement");
+    }
+
+    public Set<String> getServers() {
+        Set<SocketAddress> hosts = routingService.getHosts();
+        Set<String> servers = new HashSet<String>(hosts.size());
+        for (SocketAddress address : hosts) {
+            servers.add(DLSocketAddress.toString((InetSocketAddress) address));
+        }
+        return servers;
+    }
+
+    public Set<String> getStreams() {
+        Set<String> streams = new HashSet<String>();
+        try {
+            Iterator<String> logs = namespace.getLogs();
+            while (logs.hasNext()) {
+                streams.add(logs.next());
+            }
+        } catch (IOException e) {
+            logger.error("Could not get streams for placement policy.", e);
+        }
+        return streams;
+    }
+
+    public void start(boolean leader) {
+        logger.info("Starting placement policy");
+
+        TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
+        for (String server : getServers()) {
+            emptyServerLoads.add(new ServerLoad(server));
+        }
+        load(emptyServerLoads); //Pre-Load so streams don't NPE
+        if (leader) { //this is the leader shard
+            logger.info("Shard is leader. Scheduling timed refresh.");
+            placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", 
true);
+            placementRefreshTimer.schedule(Time.now(), refreshInterval, new 
Function0<BoxedUnit>() {
+                @Override
+                public BoxedUnit apply() {
+                    refresh();
+                    return BoxedUnit.UNIT;
+                }
+            });
+        } else {
+            logger.info("Shard is not leader. Watching for server load 
changes.");
+            placementStateManager.watch(new 
PlacementStateManager.PlacementCallback() {
+                @Override
+                public void callback(TreeSet<ServerLoad> serverLoads) {
+                    if (!serverLoads.isEmpty()) {
+                        load(serverLoads);
+                    }
+                }
+            });
+        }
+    }
+
+    public void close() {
+        if (placementRefreshTimer != null) {
+            placementRefreshTimer.stop();
+        }
+    }
+
+    /**
+     * Places the stream on a server according to the policy.
+     *
+     * <p>It returns a future containing the host that owns the stream upon 
completion
+     */
+    public abstract Future<String> placeStream(String stream);
+
+    /**
+     * Recalculates the entire placement mapping and updates stores it using 
the PlacementStateManager.
+     */
+    public abstract void refresh();
+
+    /**
+     * Loads the placement mapping into the node from a TreeSet of ServerLoads.
+     */
+    public abstract void load(TreeSet<ServerLoad> serverLoads);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java
new file mode 100644
index 0000000..0187bed
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import java.util.TreeSet;
+
+/**
+ * The PlacementStateManager handles persistence of calculated resource 
placements.
+ */
+public interface PlacementStateManager {
+
+    /**
+     * Saves the ownership mapping as a TreeSet of ServerLoads to persistent 
storage.
+     */
+    void saveOwnership(TreeSet<ServerLoad> serverLoads) throws 
StateManagerSaveException;
+
+    /**
+     * Loads the ownership mapping as TreeSet of ServerLoads from persistent 
storage.
+     */
+    TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
+
+    /**
+     * Watch the persistent storage for changes to the ownership mapping.
+     *
+     * <p>The placementCallback callbacks will be triggered with the new 
mapping when a change occurs.
+     */
+    void watch(PlacementCallback placementCallback);
+
+    /**
+     * Placement Callback.
+     *
+     * <p>The callback is triggered when server loads are updated.
+     */
+    interface PlacementCallback {
+        void callback(TreeSet<ServerLoad> serverLoads);
+    }
+
+    /**
+     * The base exception thrown when state manager encounters errors.
+     */
+    abstract class StateManagerException extends Exception {
+        public StateManagerException(String message, Exception e) {
+            super(message, e);
+        }
+    }
+
+    /**
+     * Exception thrown when failed to load the ownership mapping.
+     */
+    class StateManagerLoadException extends StateManagerException {
+        public StateManagerLoadException(Exception e) {
+            super("Load of Ownership failed", e);
+        }
+    }
+
+    /**
+     * Exception thrown when failed to save the ownership mapping.
+     */
+    class StateManagerSaveException extends StateManagerException {
+        public StateManagerSaveException(Exception e) {
+            super("Save of Ownership failed", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java
new file mode 100644
index 0000000..d65c401
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+/**
+ * An object represents the server load.
+ *
+ * <p>A comparable data object containing the identifier of the server, total 
appraised load on the
+ * server, and all streams assigned to the server by the resource placement 
mapping. This is
+ * comparable first by load and then by server so that a sorted data structure 
of these will be
+ * consistent across multiple calculations.
+ */
+public class ServerLoad implements Comparable {
+    private static final int BUFFER_SIZE = 4096000;
+    private final String server;
+    private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
+    private long load = 0L;
+
+    public ServerLoad(String server) {
+        this.server = server;
+    }
+
+    public synchronized long addStream(StreamLoad stream) {
+        this.load += stream.getLoad();
+        streamLoads.add(stream);
+        return this.load;
+    }
+
+    public synchronized long removeStream(String stream) {
+        for (StreamLoad streamLoad : streamLoads) {
+            if (streamLoad.stream.equals(stream)) {
+                this.load -= streamLoad.getLoad();
+                streamLoads.remove(streamLoad);
+                return this.load;
+            }
+        }
+        return this.load; //Throwing an exception wouldn't help us as our 
logic should never reach here
+    }
+
+    public synchronized long getLoad() {
+        return load;
+    }
+
+    public synchronized Set<StreamLoad> getStreamLoads() {
+        return streamLoads;
+    }
+
+    public synchronized String getServer() {
+        return server;
+    }
+
+    protected synchronized 
org.apache.distributedlog.service.placement.thrift.ServerLoad toThrift() {
+        org.apache.distributedlog.service.placement.thrift.ServerLoad 
tServerLoad =
+            new 
org.apache.distributedlog.service.placement.thrift.ServerLoad();
+        tServerLoad.setServer(server);
+        tServerLoad.setLoad(load);
+        
ArrayList<org.apache.distributedlog.service.placement.thrift.StreamLoad> 
tStreamLoads =
+            new 
ArrayList<org.apache.distributedlog.service.placement.thrift.StreamLoad>();
+        for (StreamLoad streamLoad : streamLoads) {
+            tStreamLoads.add(streamLoad.toThrift());
+        }
+        tServerLoad.setStreams(tStreamLoads);
+        return tServerLoad;
+    }
+
+    public byte[] serialize() throws IOException {
+        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            toThrift().write(protocol);
+            transport.flush();
+            return transport.toString(UTF_8.name()).getBytes(UTF_8);
+        } catch (TException e) {
+            throw new IOException("Failed to serialize server load : ", e);
+        } catch (UnsupportedEncodingException uee) {
+            throw new IOException("Failed to serialize server load : ", uee);
+        }
+    }
+
+    public static ServerLoad deserialize(byte[] data) throws IOException {
+        org.apache.distributedlog.service.placement.thrift.ServerLoad 
tServerLoad =
+            new 
org.apache.distributedlog.service.placement.thrift.ServerLoad();
+        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            tServerLoad.read(protocol);
+            ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
+            if (tServerLoad.isSetStreams()) {
+                for 
(org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad :
+                    tServerLoad.getStreams()) {
+                    serverLoad.addStream(new 
StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
+                }
+            }
+            return serverLoad;
+        } catch (TException e) {
+            throw new IOException("Failed to deserialize server load : ", e);
+        }
+    }
+
+    @Override
+    public synchronized int compareTo(Object o) {
+        ServerLoad other = (ServerLoad) o;
+        if (load == other.getLoad()) {
+            return server.compareTo(other.getServer());
+        } else {
+            return Long.compare(load, other.getLoad());
+        }
+    }
+
+    @Override
+    public synchronized boolean equals(Object o) {
+        if (!(o instanceof ServerLoad)) {
+            return false;
+        }
+        ServerLoad other = (ServerLoad) o;
+        return server.equals(other.getServer())
+            && load == other.getLoad()
+            && streamLoads.equals(other.getStreamLoads());
+    }
+
+    @Override
+    public synchronized String toString() {
+        return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", 
server, load, streamLoads);
+    }
+
+    @Override
+    public synchronized int hashCode() {
+        return new 
HashCodeBuilder().append(server).append(load).append(streamLoads).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java
 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java
new file mode 100644
index 0000000..f271222
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+/**
+ * An object represent the load of a stream.
+ *
+ * <p>A comparable data object containing the identifier of the stream and the 
appraised load produced
+ * by the stream.
+ */
+public class StreamLoad implements Comparable {
+    private static final int BUFFER_SIZE = 4096;
+    public final String stream;
+    private final int load;
+
+    public StreamLoad(String stream, int load) {
+        this.stream = stream;
+        this.load = load;
+    }
+
+    public int getLoad() {
+        return load;
+    }
+
+    public String getStream() {
+        return stream;
+    }
+
+    protected org.apache.distributedlog.service.placement.thrift.StreamLoad 
toThrift() {
+        org.apache.distributedlog.service.placement.thrift.StreamLoad 
tStreamLoad =
+            new 
org.apache.distributedlog.service.placement.thrift.StreamLoad();
+        return tStreamLoad.setStream(stream).setLoad(load);
+    }
+
+    public byte[] serialize() throws IOException {
+        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            toThrift().write(protocol);
+            transport.flush();
+            return transport.toString(UTF_8.name()).getBytes(UTF_8);
+        } catch (TException e) {
+            throw new IOException("Failed to serialize stream load : ", e);
+        } catch (UnsupportedEncodingException uee) {
+            throw new IOException("Failed to serialize stream load : ", uee);
+        }
+    }
+
+    public static StreamLoad deserialize(byte[] data) throws IOException {
+        org.apache.distributedlog.service.placement.thrift.StreamLoad 
tStreamLoad =
+            new 
org.apache.distributedlog.service.placement.thrift.StreamLoad();
+        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            tStreamLoad.read(protocol);
+            return new StreamLoad(tStreamLoad.getStream(), 
tStreamLoad.getLoad());
+        } catch (TException e) {
+            throw new IOException("Failed to deserialize stream load : ", e);
+        }
+    }
+
+    @Override
+    public int compareTo(Object o) {
+        StreamLoad other = (StreamLoad) o;
+        if (load == other.getLoad()) {
+            return stream.compareTo(other.getStream());
+        } else {
+            return Long.compare(load, other.getLoad());
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof StreamLoad)) {
+            return false;
+        }
+        StreamLoad other = (StreamLoad) o;
+        return stream.equals(other.getStream()) && load == other.getLoad();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(stream).append(load).build();
+    }
+}


Reply via email to