http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
new file mode 100644
index 0000000..d0a2f88
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.feature.DefaultFeatureProvider;
+import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.feature.SettableFeature;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link 
org.apache.distributedlog.exceptions.RegionUnavailableException}.
+ */
+public class TestRegionUnavailable extends DistributedLogServerTestCase {
+
+    /**
+     * A feature provider for testing.
+     */
+    public static class TestFeatureProvider extends DefaultFeatureProvider {
+
+        public TestFeatureProvider(String rootScope,
+                                   DistributedLogConfiguration conf,
+                                   StatsLogger statsLogger) {
+            super(rootScope, conf, statsLogger);
+        }
+
+        @Override
+        protected Feature makeFeature(String featureName) {
+            if 
(featureName.contains(ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase()))
 {
+                return new SettableFeature(featureName, 10000);
+            }
+            return super.makeFeature(featureName);
+        }
+
+        @Override
+        protected FeatureProvider makeProvider(String fullScopeName) {
+            return super.makeProvider(fullScopeName);
+        }
+    }
+
+    private final int numServersPerDC = 3;
+    private final List<DLServer> localCluster;
+    private final List<DLServer> remoteCluster;
+    private TwoRegionDLClient client;
+
+    public TestRegionUnavailable() {
+        super(true);
+        this.localCluster = new ArrayList<DLServer>();
+        this.remoteCluster = new ArrayList<DLServer>();
+    }
+
+    @Before
+    @Override
+    public void setup() throws Exception {
+        DistributedLogConfiguration localConf = new 
DistributedLogConfiguration();
+        localConf.addConfiguration(conf);
+        localConf.setFeatureProviderClass(TestFeatureProvider.class);
+        DistributedLogConfiguration remoteConf = new 
DistributedLogConfiguration();
+        remoteConf.addConfiguration(conf);
+        super.setup();
+        int localPort = 9010;
+        int remotePort = 9020;
+        for (int i = 0; i < numServersPerDC; i++) {
+            localCluster.add(createDistributedLogServer(localConf, localPort + 
i));
+            remoteCluster.add(createDistributedLogServer(remoteConf, 
remotePort + i));
+        }
+        Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, 
String>();
+        for (DLServer server : localCluster) {
+            regionMap.put(server.getAddress(), "local");
+        }
+        for (DLServer server : remoteCluster) {
+            regionMap.put(server.getAddress(), "remote");
+        }
+        client = createTwoRegionDLClient("two_regions_client", regionMap);
+
+    }
+
+    private void registerStream(String streamName) {
+        for (DLServer server : localCluster) {
+            client.localRoutingService.addHost(streamName, 
server.getAddress());
+        }
+        client.remoteRoutingService.addHost(streamName, 
remoteCluster.get(0).getAddress());
+    }
+
+    @After
+    @Override
+    public void teardown() throws Exception {
+        super.teardown();
+        if (null != client) {
+            client.shutdown();
+        }
+        for (DLServer server : localCluster) {
+            server.shutdown();
+        }
+        for (DLServer server : remoteCluster) {
+            server.shutdown();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testRegionUnavailable() throws Exception {
+        String name = "dlserver-region-unavailable";
+        registerStream(name);
+
+        for (long i = 1; i <= 10; i++) {
+            client.dlClient.write(name, ByteBuffer.wrap(("" + 
i).getBytes())).get();
+        }
+
+        // check local region
+        for (DLServer server : localCluster) {
+            checkStreams(0, server);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java
new file mode 100644
index 0000000..c8b8bdf
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static org.junit.Assert.assertEquals;
+
+import com.twitter.finagle.Service;
+import com.twitter.finagle.service.ConstantService;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link StatsFilter}.
+ */
+public class TestStatsFilter {
+
+    class RuntimeExService<Req, Rep> extends Service<Req, Rep> {
+        public Future<Rep> apply(Req request) {
+            throw new RuntimeException("test");
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testServiceSuccess() throws Exception {
+        StatsLogger stats = new NullStatsLogger();
+        StatsFilter<String, String> filter = new StatsFilter<String, 
String>(stats);
+        Future<String> result = filter.apply("", new ConstantService<String, 
String>(Future.value("result")));
+        assertEquals("result", Await.result(result));
+    }
+
+    @Test(timeout = 60000)
+    public void testServiceFailure() throws Exception {
+        StatsLogger stats = new NullStatsLogger();
+        StatsFilter<String, String> filter = new StatsFilter<String, 
String>(stats);
+        try {
+            filter.apply("", new RuntimeExService<String, String>());
+        } catch (RuntimeException ex) {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java
new file mode 100644
index 0000000..21bebb5
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link BalancerUtils}.
+ */
+public class TestBalancerUtils {
+
+    @Test(timeout = 60000)
+    public void testCalculateNumStreamsToRebalance() {
+        String myNode = "mynode";
+
+        // empty load distribution
+        assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
+                myNode, new HashMap<String, Integer>(), 0, 10));
+        // my node doesn't exist in load distribution
+        Map<String, Integer> loadDistribution = new HashMap<String, Integer>();
+        loadDistribution.put("node1", 10);
+        assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
+                myNode, loadDistribution, 0, 10));
+        // my node doesn't reach rebalance water mark
+        loadDistribution.clear();
+        loadDistribution.put("node1", 1);
+        loadDistribution.put(myNode, 100);
+        assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
+                myNode, loadDistribution, 200, 10));
+        // my node is below average in the cluster.
+        loadDistribution.clear();
+        loadDistribution.put(myNode, 1);
+        loadDistribution.put("node1", 99);
+        assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
+                myNode, loadDistribution, 0, 10));
+        // my node is above average in the cluster
+        assertEquals(49, BalancerUtils.calculateNumStreamsToRebalance(
+                "node1", loadDistribution, 0, 10));
+        // my node is at the tolerance range
+        loadDistribution.clear();
+        loadDistribution.put(myNode, 55);
+        loadDistribution.put("node1", 45);
+        assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
+                myNode, loadDistribution, 0, 10));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java
new file mode 100644
index 0000000..fb3fb6e
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.fail;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.service.DLSocketAddress;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
+import org.apache.distributedlog.service.DistributedLogServerTestCase;
+import com.twitter.util.Await;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for {@link ClusterBalancer}.
+ */
+public class TestClusterBalancer extends DistributedLogServerTestCase {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TestClusterBalancer.class);
+
+    private final int numServers = 5;
+    private final List<DLServer> cluster;
+    private DLClient client;
+
+    public TestClusterBalancer() {
+        super(true);
+        this.cluster = new ArrayList<DLServer>();
+    }
+
+    @Before
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        int initPort = 9001;
+        for (int i = 0; i < numServers; i++) {
+            cluster.add(createDistributedLogServer(initPort + i));
+        }
+        client = createDistributedLogClient("cluster_client", 
Optional.<String>absent());
+    }
+
+    @After
+    @Override
+    public void teardown() throws Exception {
+        super.teardown();
+        if (null != client) {
+            client.shutdown();
+        }
+        for (DLServer server: cluster) {
+            server.shutdown();
+        }
+    }
+
+    private void initStreams(String namePrefix) {
+        logger.info("Init streams with prefix {}", namePrefix);
+        // Stream Distribution: 5, 4, 3, 2, 1
+        initStreams(namePrefix, 5, 1, 0);
+        initStreams(namePrefix, 4, 6, 1);
+        initStreams(namePrefix, 3, 10, 2);
+        initStreams(namePrefix, 2, 13, 3);
+        initStreams(namePrefix, 1, 15, 4);
+    }
+
+    private void initStreams(String namePrefix, int numStreams, int streamId, 
int proxyId) {
+        for (int i = 0; i < numStreams; i++) {
+            String name = namePrefix + (streamId++);
+            client.routingService.addHost(name, 
cluster.get(proxyId).getAddress());
+        }
+    }
+
+    private void writeStreams(String namePrefix) throws Exception {
+        logger.info("Write streams with prefix {}", namePrefix);
+        writeStreams(namePrefix, 5, 1);
+        writeStreams(namePrefix, 4, 6);
+        writeStreams(namePrefix, 3, 10);
+        writeStreams(namePrefix, 2, 13);
+        writeStreams(namePrefix, 1, 15);
+    }
+
+    private void writeStreams(String namePrefix, int numStreams, int streamId) 
throws Exception {
+        for (int i = 0; i < numStreams; i++) {
+            String name = namePrefix + (streamId++);
+            try {
+                Await.result(client.dlClient.write(name, 
ByteBuffer.wrap(name.getBytes(UTF_8))));
+            } catch (Exception e) {
+                logger.error("Error writing stream {} : ", name, e);
+                throw e;
+            }
+        }
+    }
+
+    private void validateStreams(String namePrefix) throws Exception {
+        logger.info("Validate streams with prefix {}", namePrefix);
+        validateStreams(namePrefix, 5, 1, 0);
+        validateStreams(namePrefix, 4, 6, 1);
+        validateStreams(namePrefix, 3, 10, 2);
+        validateStreams(namePrefix, 2, 13, 3);
+        validateStreams(namePrefix, 1, 15, 4);
+    }
+
+    private void validateStreams(String namePrefix, int numStreams, int 
streamId, int proxyIdx) {
+        Set<String> expectedStreams = new HashSet<String>();
+        for (int i = 0; i < numStreams; i++) {
+            expectedStreams.add(namePrefix + (streamId++));
+        }
+        checkStreams(expectedStreams, cluster.get(proxyIdx));
+    }
+
+    @Ignore
+    @Test(timeout = 60000)
+    public void testBalanceAll() throws Exception {
+        String namePrefix = "clusterbalancer-balance-all-";
+
+        initStreams(namePrefix);
+        writeStreams(namePrefix);
+        validateStreams(namePrefix);
+
+        Optional<RateLimiter> rateLimiter = Optional.absent();
+
+        Balancer balancer = new ClusterBalancer(client.dlClientBuilder,
+                Pair.of((DistributedLogClient) client.dlClient, 
(MonitorServiceClient) client.dlClient));
+        logger.info("Rebalancing from 'unknown' target");
+        try {
+            balancer.balanceAll("unknown", 10, rateLimiter);
+            fail("Should fail on balanceAll from 'unknown' target.");
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+        validateStreams(namePrefix);
+
+        logger.info("Rebalancing from 'unexisted' host");
+        String addr = 
DLSocketAddress.toString(DLSocketAddress.getSocketAddress(9999));
+        balancer.balanceAll(addr, 10, rateLimiter);
+        validateStreams(namePrefix);
+
+        addr = DLSocketAddress.toString(cluster.get(0).getAddress());
+        logger.info("Rebalancing from host {}.", addr);
+        balancer.balanceAll(addr, 10, rateLimiter);
+        checkStreams(0, cluster.get(0));
+        checkStreams(4, cluster.get(1));
+        checkStreams(3, cluster.get(2));
+        checkStreams(4, cluster.get(3));
+        checkStreams(4, cluster.get(4));
+
+        addr = DLSocketAddress.toString(cluster.get(2).getAddress());
+        logger.info("Rebalancing from host {}.", addr);
+        balancer.balanceAll(addr, 10, rateLimiter);
+        checkStreams(3, cluster.get(0));
+        checkStreams(4, cluster.get(1));
+        checkStreams(0, cluster.get(2));
+        checkStreams(4, cluster.get(3));
+        checkStreams(4, cluster.get(4));
+
+        logger.info("Rebalancing the cluster");
+        balancer.balance(0, 0.0f, 10, rateLimiter);
+        for (int i = 0; i < 5; i++) {
+            checkStreams(3, cluster.get(i));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java
new file mode 100644
index 0000000..6734083
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Sets;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link CountBasedStreamChooser}.
+ */
+public class TestCountBasedStreamChooser {
+
+    @Test(timeout = 60000)
+    public void testEmptyStreamDistribution() {
+        try {
+            new CountBasedStreamChooser(new HashMap<SocketAddress, 
Set<String>>());
+            fail("Should fail constructing stream chooser if the stream 
distribution is empty");
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testMultipleHostsWithEmptyStreams() {
+        for (int i = 1; i <= 3; i++) {
+            Map<SocketAddress, Set<String>> streamDistribution = new 
HashMap<SocketAddress, Set<String>>();
+            int port = 1000;
+            for (int j = 0; j < i; j++) {
+                SocketAddress address = new InetSocketAddress("127.0.0.1", 
port + j);
+                streamDistribution.put(address, new HashSet<String>());
+            }
+
+            CountBasedStreamChooser chooser = new 
CountBasedStreamChooser(streamDistribution);
+            for (int k = 0; k < i + 1; k++) {
+                assertNull(chooser.choose());
+            }
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testSingleHostWithStreams() {
+        for (int i = 0; i < 3; i++) {
+            Map<SocketAddress, Set<String>> streamDistribution = new 
HashMap<SocketAddress, Set<String>>();
+
+            Set<String> streams = new HashSet<String>();
+            for (int j = 0; j < 3; j++) {
+                streams.add("SingleHostStream-" + j);
+            }
+
+            int port = 1000;
+            SocketAddress address = new InetSocketAddress("127.0.0.1", port);
+            streamDistribution.put(address, streams);
+
+            for (int k = 1; k <= i; k++) {
+                address = new InetSocketAddress("127.0.0.1", port + k);
+                streamDistribution.put(address, new HashSet<String>());
+            }
+
+            Set<String> choosenStreams = new HashSet<String>();
+
+            CountBasedStreamChooser chooser = new 
CountBasedStreamChooser(streamDistribution);
+            for (int l = 0; l < 3 + i + 1; l++) {
+                String s = chooser.choose();
+                if (null != s) {
+                    choosenStreams.add(s);
+                }
+            }
+
+            assertEquals(streams.size(), choosenStreams.size());
+            assertTrue(Sets.difference(streams, 
choosenStreams).immutableCopy().isEmpty());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testHostsHaveSameNumberStreams() {
+        Map<SocketAddress, Set<String>> streamDistribution = new 
HashMap<SocketAddress, Set<String>>();
+        Set<String> allStreams = new HashSet<String>();
+
+        int numHosts = 3;
+        int numStreamsPerHost = 3;
+
+        int port = 1000;
+        for (int i = 1; i <= numHosts; i++) {
+            SocketAddress address = new InetSocketAddress("127.0.0.1", port + 
i);
+            Set<String> streams = new HashSet<String>();
+
+            for (int j = 1; j <= numStreamsPerHost; j++) {
+                String streamName = "HostsHaveSameNumberStreams-" + i + "-" + 
j;
+                streams.add(streamName);
+                allStreams.add(streamName);
+            }
+
+            streamDistribution.put(address, streams);
+        }
+
+        Set<String> streamsChoosen = new HashSet<String>();
+        CountBasedStreamChooser chooser = new 
CountBasedStreamChooser(streamDistribution);
+        for (int i = 1; i <= numStreamsPerHost; i++) {
+            for (int j = 1; j <= numHosts; j++) {
+                String s = chooser.choose();
+                assertNotNull(s);
+                streamsChoosen.add(s);
+            }
+            for (int j = 0; j < numHosts; j++) {
+                assertEquals(numStreamsPerHost - i, 
chooser.streamsDistribution.get(j).getRight().size());
+            }
+        }
+        assertNull(chooser.choose());
+        assertEquals(numHosts * numStreamsPerHost, streamsChoosen.size());
+        assertTrue(Sets.difference(allStreams, streamsChoosen).isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    public void testHostsHaveDifferentNumberStreams() {
+        Map<SocketAddress, Set<String>> streamDistribution = new 
HashMap<SocketAddress, Set<String>>();
+        Set<String> allStreams = new HashSet<String>();
+
+        int numHosts = 6;
+        int maxStreamsPerHost = 4;
+
+        int port = 1000;
+        for (int i = 0; i < numHosts; i++) {
+            int group = i / 2;
+            int numStreamsThisGroup = maxStreamsPerHost - group;
+
+            SocketAddress address = new InetSocketAddress("127.0.0.1", port + 
i);
+            Set<String> streams = new HashSet<String>();
+
+            for (int j = 1; j <= numStreamsThisGroup; j++) {
+                String streamName = "HostsHaveDifferentNumberStreams-" + i + 
"-" + j;
+                streams.add(streamName);
+                allStreams.add(streamName);
+            }
+
+            streamDistribution.put(address, streams);
+        }
+
+        Set<String> streamsChoosen = new HashSet<String>();
+        CountBasedStreamChooser chooser = new 
CountBasedStreamChooser(streamDistribution);
+
+        for (int i = 0; i < allStreams.size(); i++) {
+            String s = chooser.choose();
+            assertNotNull(s);
+            streamsChoosen.add(s);
+        }
+        assertNull(chooser.choose());
+        assertEquals(allStreams.size(), streamsChoosen.size());
+        assertTrue(Sets.difference(allStreams, streamsChoosen).isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    public void testLimitedStreamChooser() {
+        Map<SocketAddress, Set<String>> streamDistribution = new 
HashMap<SocketAddress, Set<String>>();
+
+        Set<String> streams = new HashSet<String>();
+        for (int j = 0; j < 10; j++) {
+            streams.add("SingleHostStream-" + j);
+        }
+
+        int port = 1000;
+        SocketAddress address = new InetSocketAddress("127.0.0.1", port);
+        streamDistribution.put(address, streams);
+
+        Set<String> choosenStreams = new HashSet<String>();
+
+        CountBasedStreamChooser underlying = new 
CountBasedStreamChooser(streamDistribution);
+        LimitedStreamChooser chooser = LimitedStreamChooser.of(underlying, 1);
+        for (int l = 0; l < 10; l++) {
+            String s = chooser.choose();
+            if (null != s) {
+                choosenStreams.add(s);
+            }
+        }
+
+        assertEquals(1, choosenStreams.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java
new file mode 100644
index 0000000..73fa98a
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
+import org.apache.distributedlog.service.DistributedLogServerTestCase;
+import com.twitter.util.Await;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for {@link SimpleBalancer}.
+ */
+public class TestSimpleBalancer extends DistributedLogServerTestCase {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TestSimpleBalancer.class);
+
+    DLClient targetClient;
+    DLServer targetServer;
+
+    public TestSimpleBalancer() {
+        super(true);
+    }
+
+    @Before
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        targetServer = createDistributedLogServer(7003);
+        targetClient = createDistributedLogClient("target", 
Optional.<String>absent());
+    }
+
+    @After
+    @Override
+    public void teardown() throws Exception {
+        super.teardown();
+        if (null != targetClient) {
+            targetClient.shutdown();
+        }
+        if (null != targetServer) {
+            targetServer.shutdown();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testBalanceAll() throws Exception {
+        String namePrefix = "simplebalancer-balance-all-";
+        int numStreams = 10;
+
+        for (int i = 0; i < numStreams; i++) {
+            String name = namePrefix + i;
+            // src client
+            dlClient.routingService.addHost(name, dlServer.getAddress());
+            // target client
+            targetClient.routingService.addHost(name, 
targetServer.getAddress());
+        }
+
+        // write to multiple streams
+        for (int i = 0; i < numStreams; i++) {
+            String name = namePrefix + i;
+            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + 
i).getBytes(UTF_8))));
+        }
+
+        // validation
+        for (int i = 0; i < numStreams; i++) {
+            String name = namePrefix + i;
+            checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, 
true, true);
+            checkStream(name, targetClient, targetServer, 0, 0, 0, false, 
false);
+        }
+
+        Optional<RateLimiter> rateLimiter = Optional.absent();
+
+        Balancer balancer = new SimpleBalancer("source", dlClient.dlClient, 
dlClient.dlClient,
+                                               "target", 
targetClient.dlClient, targetClient.dlClient);
+        logger.info("Rebalancing from 'unknown' target");
+        try {
+            balancer.balanceAll("unknown", 10, rateLimiter);
+            fail("Should fail on balanceAll from 'unknown' target.");
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+
+        // nothing to balance from 'target'
+        logger.info("Rebalancing from 'target' target");
+        balancer.balanceAll("target", 1, rateLimiter);
+        for (int i = 0; i < numStreams; i++) {
+            String name = namePrefix + i;
+            checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, 
true, true);
+            checkStream(name, targetClient, targetServer, 0, 0, 0, false, 
false);
+        }
+
+        // balance all streams from 'source'
+        logger.info("Rebalancing from 'source' target");
+        balancer.balanceAll("source", 10, rateLimiter);
+        for (int i = 0; i < numStreams; i++) {
+            String name = namePrefix + i;
+            checkStream(name, targetClient, targetServer, 1, numStreams, 
numStreams, true, true);
+            checkStream(name, dlClient, dlServer, 0, 0, 0, false, false);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testBalanceStreams() throws Exception {
+        String namePrefix = "simplebalancer-balance-streams-";
+        int numStreams = 10;
+
+        for (int i = 0; i < numStreams; i++) {
+            String name = namePrefix + i;
+            // src client
+            dlClient.routingService.addHost(name, dlServer.getAddress());
+            // target client
+            targetClient.routingService.addHost(name, 
targetServer.getAddress());
+        }
+
+        // write to multiple streams
+        for (int i = 0; i < numStreams; i++) {
+            String name = namePrefix + i;
+            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + 
i).getBytes(UTF_8))));
+        }
+
+        // validation
+        for (int i = 0; i < numStreams; i++) {
+            String name = namePrefix + i;
+            checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, 
true, true);
+            checkStream(name, targetClient, targetServer, 0, 0, 0, false, 
false);
+        }
+
+        Optional<RateLimiter> rateLimiter = Optional.absent();
+
+        Balancer balancer = new SimpleBalancer("source", dlClient.dlClient, 
dlClient.dlClient,
+                                               "target", 
targetClient.dlClient, targetClient.dlClient);
+
+        // balance all streams from 'source'
+        logger.info("Rebalancing streams between targets");
+        balancer.balance(0, 0, 10, rateLimiter);
+
+        Set<String> sourceStreams = 
getAllStreamsFromDistribution(getStreamOwnershipDistribution(dlClient));
+        Set<String> targetStreams = 
getAllStreamsFromDistribution(getStreamOwnershipDistribution(targetClient));
+
+        assertEquals(numStreams / 2, sourceStreams.size());
+        assertEquals(numStreams / 2, targetStreams.size());
+
+        for (String name : sourceStreams) {
+            checkStream(name, dlClient, dlServer, 1, numStreams / 2, 
numStreams / 2, true, true);
+            checkStream(name, targetClient, targetServer, 1, numStreams / 2, 
numStreams / 2, false, false);
+        }
+
+        for (String name : targetStreams) {
+            checkStream(name, targetClient, targetServer, 1, numStreams / 2, 
numStreams / 2, true, true);
+            checkStream(name, dlClient, dlServer, 1, numStreams / 2, 
numStreams / 2, false, false);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java
new file mode 100644
index 0000000..ce7b2c1
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Optional;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
+import org.apache.distributedlog.service.DistributedLogServerTestCase;
+import com.twitter.util.Await;
+import java.nio.ByteBuffer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link StreamMover}.
+ */
+public class TestStreamMover extends DistributedLogServerTestCase {
+
+    DLClient targetClient;
+    DLServer targetServer;
+
+    public TestStreamMover() {
+        super(true);
+    }
+
+    @Before
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        targetServer = createDistributedLogServer(7003);
+        targetClient = createDistributedLogClient("target", 
Optional.<String>absent());
+    }
+
+    @After
+    @Override
+    public void teardown() throws Exception {
+        super.teardown();
+        if (null != targetClient) {
+            targetClient.shutdown();
+        }
+        if (null != targetServer) {
+            targetServer.shutdown();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testMoveStream() throws Exception {
+        String name = "dlserver-move-stream";
+
+        // src client
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+        // target client
+        targetClient.routingService.addHost(name, targetServer.getAddress());
+
+        // src client write a record to that stream
+        Await.result(((DistributedLogClient) dlClient.dlClient).write(name, 
ByteBuffer.wrap("1".getBytes(UTF_8))));
+        checkStream(name, dlClient, dlServer, 1, 1, 1, true, true);
+        checkStream(name, targetClient, targetServer, 0, 0, 0, false, false);
+
+        StreamMover streamMover = new StreamMoverImpl("source", 
dlClient.dlClient, dlClient.dlClient,
+                                                      "target", 
targetClient.dlClient, targetClient.dlClient);
+        assertTrue(streamMover.moveStream(name));
+        checkStream(name, dlClient, dlServer, 0, 0, 0, false, false);
+        checkStream(name, targetClient, targetServer, 1, 1, 1, true, true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java
new file mode 100644
index 0000000..71dfa45
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Test Case for {@link ServerConfiguration}.
+ */
+public class TestServerConfiguration {
+
+    @Test(timeout = 60000, expected = IllegalArgumentException.class)
+    public void testUnassignedShardId() {
+        new ServerConfiguration().validate();
+    }
+
+    @Test(timeout = 60000)
+    public void testAssignedShardId() {
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setServerShardId(100);
+        conf.validate();
+        assertEquals(100, conf.getServerShardId());
+    }
+
+    @Test(timeout = 60000, expected = IllegalArgumentException.class)
+    public void testInvalidServerThreads() {
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setServerShardId(100);
+        conf.setServerThreads(-1);
+        conf.validate();
+    }
+
+    @Test(timeout = 60000, expected = IllegalArgumentException.class)
+    public void testInvalidDlsnVersion() {
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setServerShardId(100);
+        conf.setDlsnVersion((byte) 9999);
+        conf.validate();
+    }
+
+    @Test(timeout = 60000)
+    public void testUseHostnameAsAllocatorPoolName() {
+        ServerConfiguration conf = new ServerConfiguration();
+        assertFalse("Should not use hostname by default", 
conf.isUseHostnameAsAllocatorPoolName());
+        conf.setUseHostnameAsAllocatorPoolName(true);
+        assertTrue("Should use hostname now", 
conf.isUseHostnameAsAllocatorPoolName());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java
new file mode 100644
index 0000000..bdbde11
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.config;
+
+import static 
org.apache.distributedlog.DistributedLogConfiguration.BKDL_RETENTION_PERIOD_IN_HOURS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.config.PropertiesWriter;
+import 
org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link StreamConfigProvider}.
+ */
+public class TestStreamConfigProvider {
+    private static final String DEFAULT_CONFIG_DIR = "conf";
+    private final String defaultConfigPath;
+    private final ScheduledExecutorService configExecutorService;
+
+    public TestStreamConfigProvider() throws Exception {
+        this.configExecutorService = Executors.newScheduledThreadPool(1,
+                new 
ThreadFactoryBuilder().setNameFormat("DistributedLogService-Dyncfg-%d").build());
+        PropertiesWriter writer = new PropertiesWriter();
+        writer.save();
+        this.defaultConfigPath = writer.getFile().getPath();
+    }
+
+    StreamConfigProvider getServiceProvider(StreamPartitionConverter converter)
+            throws Exception {
+        return getServiceProvider(converter, DEFAULT_CONFIG_DIR);
+    }
+
+    StreamConfigProvider getServiceProvider(
+            StreamPartitionConverter converter,
+            String configPath,
+            String defaultPath) throws Exception {
+        return new ServiceStreamConfigProvider(
+                configPath,
+                defaultPath,
+                converter,
+                configExecutorService,
+                1,
+                TimeUnit.SECONDS);
+    }
+
+    StreamConfigProvider getServiceProvider(
+            StreamPartitionConverter converter,
+            String configPath) throws Exception {
+        return getServiceProvider(converter, configPath, defaultConfigPath);
+    }
+
+    StreamConfigProvider getDefaultProvider(String configFile) throws 
Exception {
+        return new DefaultStreamConfigProvider(configFile, 
configExecutorService, 1, TimeUnit.SECONDS);
+    }
+
+    StreamConfigProvider getNullProvider() throws Exception {
+        return new NullStreamConfigProvider();
+    }
+
+    @Test(timeout = 60000)
+    public void testServiceProviderWithConfigRouters() throws Exception {
+        getServiceProvider(new IdentityStreamPartitionConverter());
+    }
+
+    @Test(timeout = 60000)
+    public void testServiceProviderWithMissingConfig() throws Exception {
+        StreamConfigProvider provider = getServiceProvider(new 
IdentityStreamPartitionConverter());
+        Optional<DynamicDistributedLogConfiguration> config = 
provider.getDynamicStreamConfig("stream1");
+        assertTrue(config.isPresent());
+    }
+
+    @Test(timeout = 60000)
+    public void testServiceProviderWithDefaultConfigPath() throws Exception {
+        // Default config with property set.
+        PropertiesWriter writer1 = new PropertiesWriter();
+        writer1.setProperty("rpsStreamAcquireServiceLimit", "191919");
+        writer1.save();
+        String fallbackConfPath1 = writer1.getFile().getPath();
+        StreamConfigProvider provider1 = getServiceProvider(new 
IdentityStreamPartitionConverter(),
+                DEFAULT_CONFIG_DIR, fallbackConfPath1);
+        Optional<DynamicDistributedLogConfiguration> config1 = 
provider1.getDynamicStreamConfig("stream1");
+
+        // Empty default config.
+        PropertiesWriter writer2 = new PropertiesWriter();
+        writer2.save();
+        String fallbackConfPath2 = writer2.getFile().getPath();
+        StreamConfigProvider provider2 = getServiceProvider(new 
IdentityStreamPartitionConverter(),
+                DEFAULT_CONFIG_DIR, fallbackConfPath2);
+        Optional<DynamicDistributedLogConfiguration> config2 = 
provider2.getDynamicStreamConfig("stream1");
+
+        assertEquals(191919, config1.get().getRpsStreamAcquireServiceLimit());
+        assertEquals(-1, config2.get().getRpsStreamAcquireServiceLimit());
+    }
+
+    @Test(timeout = 60000)
+    public void testDefaultProvider() throws Exception {
+        PropertiesWriter writer = new PropertiesWriter();
+        writer.setProperty(BKDL_RETENTION_PERIOD_IN_HOURS, "99");
+        writer.save();
+        StreamConfigProvider provider = 
getDefaultProvider(writer.getFile().getPath());
+        Optional<DynamicDistributedLogConfiguration> config1 = 
provider.getDynamicStreamConfig("stream1");
+        Optional<DynamicDistributedLogConfiguration> config2 = 
provider.getDynamicStreamConfig("stream2");
+        assertTrue(config1.isPresent());
+        assertTrue(config1.get() == config2.get());
+        assertEquals(99, config1.get().getRetentionPeriodHours());
+    }
+
+    @Test(timeout = 60000)
+    public void testNullProvider() throws Exception {
+        StreamConfigProvider provider = getNullProvider();
+        Optional<DynamicDistributedLogConfiguration> config1 = 
provider.getDynamicStreamConfig("stream1");
+        Optional<DynamicDistributedLogConfiguration> config2 = 
provider.getDynamicStreamConfig("stream2");
+        assertFalse(config1.isPresent());
+        assertTrue(config1 == config2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
new file mode 100644
index 0000000..5f5ecd4
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.LinkedHashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test Case for {@link LeastLoadPlacementPolicy}.
+ */
+public class TestLeastLoadPlacementPolicy {
+
+    @Test(timeout = 10000)
+    public void testCalculateBalances() throws Exception {
+        int numSevers = new Random().nextInt(20) + 1;
+        int numStreams = new Random().nextInt(200) + 1;
+        RoutingService mockRoutingService = mock(RoutingService.class);
+        DistributedLogNamespace mockNamespace = 
mock(DistributedLogNamespace.class);
+        LeastLoadPlacementPolicy leastLoadPlacementPolicy = new 
LeastLoadPlacementPolicy(
+            new EqualLoadAppraiser(),
+            mockRoutingService,
+            mockNamespace,
+            null,
+            Duration.fromSeconds(600),
+            new NullStatsLogger());
+        TreeSet<ServerLoad> serverLoads =
+            
Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), 
generateStreams(numStreams)));
+        long lowLoadPerServer = numStreams / numSevers;
+        long highLoadPerServer = lowLoadPerServer + 1;
+        for (ServerLoad serverLoad : serverLoads) {
+            long load = serverLoad.getLoad();
+            assertEquals(load, serverLoad.getStreamLoads().size());
+            assertTrue(String.format("Load %d is not between %d and %d",
+                load, lowLoadPerServer, highLoadPerServer), load == 
lowLoadPerServer || load == highLoadPerServer);
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testRefreshAndPlaceStream() throws Exception {
+        int numSevers = new Random().nextInt(20) + 1;
+        int numStreams = new Random().nextInt(200) + 1;
+        RoutingService mockRoutingService = mock(RoutingService.class);
+        
when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
+        DistributedLogNamespace mockNamespace = 
mock(DistributedLogNamespace.class);
+        try {
+            
when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
+        } catch (IOException e) {
+            fail();
+        }
+        PlacementStateManager mockPlacementStateManager = 
mock(PlacementStateManager.class);
+        LeastLoadPlacementPolicy leastLoadPlacementPolicy = new 
LeastLoadPlacementPolicy(
+            new EqualLoadAppraiser(),
+            mockRoutingService,
+            mockNamespace,
+            mockPlacementStateManager,
+            Duration.fromSeconds(600),
+            new NullStatsLogger());
+        leastLoadPlacementPolicy.refresh();
+
+        final ArgumentCaptor<TreeSet> captor = 
ArgumentCaptor.forClass(TreeSet.class);
+        verify(mockPlacementStateManager).saveOwnership(captor.capture());
+        TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>) 
captor.getValue();
+        ServerLoad next = serverLoads.first();
+        String serverPlacement = 
Await.result(leastLoadPlacementPolicy.placeStream("newstream1"));
+        assertEquals(next.getServer(), serverPlacement);
+    }
+
+    @Test(timeout = 10000)
+    public void testCalculateUnequalWeight() throws Exception {
+        int numSevers = new Random().nextInt(20) + 1;
+        int numStreams = new Random().nextInt(200) + 1;
+    /* use AtomicInteger to have a final object in answer method */
+        final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
+        RoutingService mockRoutingService = mock(RoutingService.class);
+        DistributedLogNamespace mockNamespace = 
mock(DistributedLogNamespace.class);
+        LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
+        when(mockLoadAppraiser.getStreamLoad(anyString())).then(new 
Answer<Future<StreamLoad>>() {
+            @Override
+            public Future<StreamLoad> answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                int load = new Random().nextInt(100000);
+                if (load > maxLoad.get()) {
+                    maxLoad.set(load);
+                }
+                return Future.value(new 
StreamLoad(invocationOnMock.getArguments()[0].toString(), load));
+            }
+        });
+        LeastLoadPlacementPolicy leastLoadPlacementPolicy = new 
LeastLoadPlacementPolicy(
+            mockLoadAppraiser,
+            mockRoutingService,
+            mockNamespace,
+            null,
+            Duration.fromSeconds(600),
+            new NullStatsLogger());
+        TreeSet<ServerLoad> serverLoads =
+            
Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), 
generateStreams(numStreams)));
+        long highestLoadSeen = Long.MIN_VALUE;
+        long lowestLoadSeen = Long.MAX_VALUE;
+        for (ServerLoad serverLoad : serverLoads) {
+            long load = serverLoad.getLoad();
+            if (load < lowestLoadSeen) {
+                lowestLoadSeen = load;
+            }
+            if (load > highestLoadSeen) {
+                highestLoadSeen = load;
+            }
+        }
+        assertTrue("Unexpected placement for " + numStreams + " streams to "
+                + numSevers + " servers : highest load = " + highestLoadSeen
+                + ", lowest load = " + lowestLoadSeen + ", max stream load = " 
+ maxLoad.get(),
+            highestLoadSeen - lowestLoadSeen <= maxLoad.get());
+    }
+
+    private Set<SocketAddress> generateSocketAddresses(int num) {
+        LinkedHashSet<SocketAddress> socketAddresses = new 
LinkedHashSet<SocketAddress>();
+        for (int i = 0; i < num; i++) {
+            socketAddresses.add(new InetSocketAddress(i));
+        }
+        return socketAddresses;
+    }
+
+    private Set<String> generateStreams(int num) {
+        LinkedHashSet<String> streams = new LinkedHashSet<String>();
+        for (int i = 0; i < num; i++) {
+            streams.add("stream_" + i);
+        }
+        return streams;
+    }
+
+    private Set<String> generateServers(int num) {
+        LinkedHashSet<String> servers = new LinkedHashSet<String>();
+        for (int i = 0; i < num; i++) {
+            servers.add("server_" + i);
+        }
+        return servers;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java
new file mode 100644
index 0000000..5bd234f
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link ServerLoad}.
+ */
+public class TestServerLoad {
+
+    @Test(timeout = 60000)
+    public void testSerializeDeserialize() throws IOException {
+        final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
+        for (int i = 0; i < 20; i++) {
+            serverLoad.addStream(new StreamLoad("stream-" + i, i));
+        }
+        assertEquals(serverLoad, 
ServerLoad.deserialize(serverLoad.serialize()));
+    }
+
+    @Test(timeout = 60000)
+    public void testGetLoad() throws IOException {
+        final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
+        assertEquals(0, serverLoad.getLoad());
+        serverLoad.addStream(new StreamLoad("stream-" + 1, 3));
+        assertEquals(3, serverLoad.getLoad());
+        serverLoad.addStream(new StreamLoad("stream-" + 2, 7));
+        assertEquals(10, serverLoad.getLoad());
+        serverLoad.addStream(new StreamLoad("stream-" + 3, 1));
+        assertEquals(11, serverLoad.getLoad());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java
new file mode 100644
index 0000000..36a6fed
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link StreamLoad}.
+ */
+public class TestStreamLoad {
+
+    @Test(timeout = 10000)
+    public void testSerializeDeserialize() throws IOException {
+        final String streamName = "aHellaRandomStreamName";
+        final int load = 1337;
+        final StreamLoad streamLoad = new StreamLoad(streamName, load);
+        assertEquals(streamLoad, 
StreamLoad.deserialize(streamLoad.serialize()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java
new file mode 100644
index 0000000..07ec5a5
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import java.io.IOException;
+import java.net.URI;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link ZKPlacementStateManager}.
+ */
+public class TestZKPlacementStateManager {
+    private TestingServer zkTestServer;
+    private String zkServers;
+    private URI uri;
+    private ZKPlacementStateManager zkPlacementStateManager;
+
+    @Before
+    public void startZookeeper() throws Exception {
+        zkTestServer = new TestingServer(2181);
+        zkServers = "127.0.0.1:2181";
+        uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + 
"/bknamespace");
+        zkPlacementStateManager =
+            new ZKPlacementStateManager(uri, new 
DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
+    }
+
+    @Test(timeout = 60000)
+    public void testSaveLoad() throws Exception {
+        TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+        zkPlacementStateManager.saveOwnership(ownerships);
+        SortedSet<ServerLoad> loadedOwnerships = 
zkPlacementStateManager.loadOwnership();
+        assertEquals(ownerships, loadedOwnerships);
+
+        ownerships.add(new ServerLoad("emptyServer"));
+        zkPlacementStateManager.saveOwnership(ownerships);
+        loadedOwnerships = zkPlacementStateManager.loadOwnership();
+        assertEquals(ownerships, loadedOwnerships);
+
+        ServerLoad sl1 = new ServerLoad("server1");
+        sl1.addStream(new StreamLoad("stream1", 3));
+        sl1.addStream(new StreamLoad("stream2", 4));
+        ServerLoad sl2 = new ServerLoad("server2");
+        sl2.addStream(new StreamLoad("stream3", 1));
+        ownerships.add(sl1);
+        ownerships.add(sl2);
+        zkPlacementStateManager.saveOwnership(ownerships);
+        loadedOwnerships = zkPlacementStateManager.loadOwnership();
+        assertEquals(ownerships, loadedOwnerships);
+
+        loadedOwnerships.remove(sl1);
+        zkPlacementStateManager.saveOwnership(ownerships);
+        loadedOwnerships = zkPlacementStateManager.loadOwnership();
+        assertEquals(ownerships, loadedOwnerships);
+    }
+
+    private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc(
+        LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue,
+        int expectedNumServerLoads) throws InterruptedException {
+        TreeSet<ServerLoad> notification = notificationQueue.take();
+        assertNotNull(notification);
+        while (notification.size() < expectedNumServerLoads) {
+            notification = notificationQueue.take();
+        }
+        assertEquals(expectedNumServerLoads, notification.size());
+        return notification;
+    }
+
+    @Test(timeout = 60000)
+    public void testWatchIndefinitely() throws Exception {
+        TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+        ownerships.add(new ServerLoad("server1"));
+        final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications 
=
+            new LinkedBlockingQueue<TreeSet<ServerLoad>>();
+        PlacementStateManager.PlacementCallback callback = new 
PlacementStateManager.PlacementCallback() {
+            @Override
+            public void callback(TreeSet<ServerLoad> serverLoads) {
+                serverLoadNotifications.add(serverLoads);
+            }
+        };
+        zkPlacementStateManager.saveOwnership(ownerships); // need to 
initialize the zk path before watching
+        zkPlacementStateManager.watch(callback);
+        // cannot verify the callback here as it may call before the verify is 
called
+
+        zkPlacementStateManager.saveOwnership(ownerships);
+        assertEquals(ownerships, 
waitForServerLoadsNotificationAsc(serverLoadNotifications, 1));
+
+        ServerLoad server2 = new ServerLoad("server2");
+        server2.addStream(new StreamLoad("hella-important-stream", 415));
+        ownerships.add(server2);
+        zkPlacementStateManager.saveOwnership(ownerships);
+        assertEquals(ownerships, 
waitForServerLoadsNotificationAsc(serverLoadNotifications, 2));
+    }
+
+    @Test(timeout = 60000)
+    public void testZkFormatting() throws Exception {
+        final String server = "host/10.0.0.0:31351";
+        final String zkFormattedServer = "host--10.0.0.0:31351";
+        URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE 
+ "/bknamespace");
+        ZKPlacementStateManager zkPlacementStateManager =
+            new ZKPlacementStateManager(uri, new 
DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
+        assertEquals(zkFormattedServer, 
zkPlacementStateManager.serverToZkFormat(server));
+        assertEquals(server, 
zkPlacementStateManager.zkFormatToServer(zkFormattedServer));
+    }
+
+    @After
+    public void stopZookeeper() throws IOException {
+        zkTestServer.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
new file mode 100644
index 0000000..56e9483
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import com.twitter.util.Await;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test Case for StreamManager.
+ */
+public class TestStreamManager {
+
+    @Rule
+    public TestName testName = new TestName();
+
+    ScheduledExecutorService mockExecutorService = 
mock(ScheduledExecutorService.class);
+
+    @Test(timeout = 60000)
+    public void testCollectionMethods() throws Exception {
+        Stream mockStream = mock(Stream.class);
+        when(mockStream.getStreamName()).thenReturn("stream1");
+        when(mockStream.getPartition()).thenReturn(new Partition("stream1", 
0));
+        StreamFactory mockStreamFactory = mock(StreamFactory.class);
+        StreamPartitionConverter mockPartitionConverter = 
mock(StreamPartitionConverter.class);
+        StreamConfigProvider mockStreamConfigProvider = 
mock(StreamConfigProvider.class);
+        when(mockStreamFactory.create(
+                (String) any(),
+                (DynamicDistributedLogConfiguration) any(),
+                (StreamManager) any())).thenReturn(mockStream);
+        StreamManager streamManager = new StreamManagerImpl(
+                "",
+                new DistributedLogConfiguration(),
+                mockExecutorService,
+                mockStreamFactory,
+                mockPartitionConverter,
+                mockStreamConfigProvider,
+                mock(DistributedLogNamespace.class));
+
+        assertFalse(streamManager.isAcquired("stream1"));
+        assertEquals(0, streamManager.numAcquired());
+        assertEquals(0, streamManager.numCached());
+
+        streamManager.notifyAcquired(mockStream);
+        assertTrue(streamManager.isAcquired("stream1"));
+        assertEquals(1, streamManager.numAcquired());
+        assertEquals(0, streamManager.numCached());
+
+        streamManager.notifyReleased(mockStream);
+        assertFalse(streamManager.isAcquired("stream1"));
+        assertEquals(0, streamManager.numAcquired());
+        assertEquals(0, streamManager.numCached());
+
+        streamManager.notifyAcquired(mockStream);
+        assertTrue(streamManager.isAcquired("stream1"));
+        assertEquals(1, streamManager.numAcquired());
+        assertEquals(0, streamManager.numCached());
+
+        streamManager.notifyAcquired(mockStream);
+        assertTrue(streamManager.isAcquired("stream1"));
+        assertEquals(1, streamManager.numAcquired());
+        assertEquals(0, streamManager.numCached());
+
+        streamManager.notifyReleased(mockStream);
+        assertFalse(streamManager.isAcquired("stream1"));
+        assertEquals(0, streamManager.numAcquired());
+        assertEquals(0, streamManager.numCached());
+
+        streamManager.notifyReleased(mockStream);
+        assertFalse(streamManager.isAcquired("stream1"));
+        assertEquals(0, streamManager.numAcquired());
+        assertEquals(0, streamManager.numCached());
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateStream() throws Exception {
+        Stream mockStream = mock(Stream.class);
+        final String streamName = "stream1";
+        when(mockStream.getStreamName()).thenReturn(streamName);
+        StreamFactory mockStreamFactory = mock(StreamFactory.class);
+        StreamPartitionConverter mockPartitionConverter = 
mock(StreamPartitionConverter.class);
+        StreamConfigProvider mockStreamConfigProvider = 
mock(StreamConfigProvider.class);
+        when(mockStreamFactory.create(
+            (String) any(),
+            (DynamicDistributedLogConfiguration) any(),
+            (StreamManager) any())
+        ).thenReturn(mockStream);
+        DistributedLogNamespace dlNamespace = 
mock(DistributedLogNamespace.class);
+        ScheduledExecutorService executorService = new 
ScheduledThreadPoolExecutor(1);
+
+        StreamManager streamManager = new StreamManagerImpl(
+                "",
+                new DistributedLogConfiguration(),
+                executorService,
+                mockStreamFactory,
+                mockPartitionConverter,
+                mockStreamConfigProvider,
+                dlNamespace);
+
+        
assertTrue(Await.ready(streamManager.createStreamAsync(streamName)).isReturn());
+        verify(dlNamespace).createLog(streamName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
new file mode 100644
index 0000000..a18fda1
--- /dev/null
+++ 
b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.acl.DefaultAccessControlManager;
+import org.apache.distributedlog.exceptions.InternalServerException;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import 
org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import java.nio.ByteBuffer;
+import org.apache.bookkeeper.feature.SettableFeature;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test Case for StreamOps.
+ */
+public class TestStreamOp {
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private WriteOp getWriteOp() {
+        SettableFeature disabledFeature = new SettableFeature("", 0);
+        return new WriteOp("test",
+            ByteBuffer.wrap("test".getBytes()),
+            new NullStatsLogger(),
+            new NullStatsLogger(),
+            new IdentityStreamPartitionConverter(),
+            new ServerConfiguration(),
+            (byte) 0,
+            null,
+            false,
+            disabledFeature,
+            DefaultAccessControlManager.INSTANCE);
+    }
+
+    @Test(timeout = 60000)
+    public void testResponseFailedTwice() throws Exception {
+        WriteOp writeOp = getWriteOp();
+        writeOp.fail(new InternalServerException("test1"));
+        writeOp.fail(new InternalServerException("test2"));
+
+        WriteResponse response = Await.result(writeOp.result());
+        assertEquals(StatusCode.INTERNAL_SERVER_ERROR, 
response.getHeader().getCode());
+        assertEquals(ResponseUtils.exceptionToHeader(new 
InternalServerException("test1")), response.getHeader());
+    }
+
+    @Test(timeout = 60000)
+    public void testResponseSucceededThenFailed() throws Exception {
+        AsyncLogWriter writer = mock(AsyncLogWriter.class);
+        when(writer.write((LogRecord) any())).thenReturn(Future.value(new 
DLSN(1, 2, 3)));
+        when(writer.getStreamName()).thenReturn("test");
+        WriteOp writeOp = getWriteOp();
+        writeOp.execute(writer, new Sequencer() {
+            public long nextId() {
+                return 0;
+            }
+        }, new Object());
+        writeOp.fail(new InternalServerException("test2"));
+
+        WriteResponse response = Await.result(writeOp.result());
+        assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
+    }
+}

Reply via email to