http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java new file mode 100644 index 0000000..d0a2f88 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.feature.DefaultFeatureProvider; +import org.apache.distributedlog.service.DistributedLogCluster.DLServer; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.feature.SettableFeature; +import org.apache.bookkeeper.stats.StatsLogger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test Case for {@link org.apache.distributedlog.exceptions.RegionUnavailableException}. + */ +public class TestRegionUnavailable extends DistributedLogServerTestCase { + + /** + * A feature provider for testing. + */ + public static class TestFeatureProvider extends DefaultFeatureProvider { + + public TestFeatureProvider(String rootScope, + DistributedLogConfiguration conf, + StatsLogger statsLogger) { + super(rootScope, conf, statsLogger); + } + + @Override + protected Feature makeFeature(String featureName) { + if (featureName.contains(ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase())) { + return new SettableFeature(featureName, 10000); + } + return super.makeFeature(featureName); + } + + @Override + protected FeatureProvider makeProvider(String fullScopeName) { + return super.makeProvider(fullScopeName); + } + } + + private final int numServersPerDC = 3; + private final List<DLServer> localCluster; + private final List<DLServer> remoteCluster; + private TwoRegionDLClient client; + + public TestRegionUnavailable() { + super(true); + this.localCluster = new ArrayList<DLServer>(); + this.remoteCluster = new ArrayList<DLServer>(); + } + + @Before + @Override + public void setup() throws Exception { + DistributedLogConfiguration localConf = new DistributedLogConfiguration(); + localConf.addConfiguration(conf); + localConf.setFeatureProviderClass(TestFeatureProvider.class); + DistributedLogConfiguration remoteConf = new DistributedLogConfiguration(); + remoteConf.addConfiguration(conf); + super.setup(); + int localPort = 9010; + int remotePort = 9020; + for (int i = 0; i < numServersPerDC; i++) { + localCluster.add(createDistributedLogServer(localConf, localPort + i)); + remoteCluster.add(createDistributedLogServer(remoteConf, remotePort + i)); + } + Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>(); + for (DLServer server : localCluster) { + regionMap.put(server.getAddress(), "local"); + } + for (DLServer server : remoteCluster) { + regionMap.put(server.getAddress(), "remote"); + } + client = createTwoRegionDLClient("two_regions_client", regionMap); + + } + + private void registerStream(String streamName) { + for (DLServer server : localCluster) { + client.localRoutingService.addHost(streamName, server.getAddress()); + } + client.remoteRoutingService.addHost(streamName, remoteCluster.get(0).getAddress()); + } + + @After + @Override + public void teardown() throws Exception { + super.teardown(); + if (null != client) { + client.shutdown(); + } + for (DLServer server : localCluster) { + server.shutdown(); + } + for (DLServer server : remoteCluster) { + server.shutdown(); + } + } + + @Test(timeout = 60000) + public void testRegionUnavailable() throws Exception { + String name = "dlserver-region-unavailable"; + registerStream(name); + + for (long i = 1; i <= 10; i++) { + client.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); + } + + // check local region + for (DLServer server : localCluster) { + checkStreams(0, server); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java new file mode 100644 index 0000000..c8b8bdf --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static org.junit.Assert.assertEquals; + +import com.twitter.finagle.Service; +import com.twitter.finagle.service.ConstantService; +import com.twitter.util.Await; +import com.twitter.util.Future; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.junit.Test; + +/** + * Test Case for {@link StatsFilter}. + */ +public class TestStatsFilter { + + class RuntimeExService<Req, Rep> extends Service<Req, Rep> { + public Future<Rep> apply(Req request) { + throw new RuntimeException("test"); + } + } + + @Test(timeout = 60000) + public void testServiceSuccess() throws Exception { + StatsLogger stats = new NullStatsLogger(); + StatsFilter<String, String> filter = new StatsFilter<String, String>(stats); + Future<String> result = filter.apply("", new ConstantService<String, String>(Future.value("result"))); + assertEquals("result", Await.result(result)); + } + + @Test(timeout = 60000) + public void testServiceFailure() throws Exception { + StatsLogger stats = new NullStatsLogger(); + StatsFilter<String, String> filter = new StatsFilter<String, String>(stats); + try { + filter.apply("", new RuntimeExService<String, String>()); + } catch (RuntimeException ex) { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java new file mode 100644 index 0000000..21bebb5 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import static org.junit.Assert.assertEquals; + +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; + +/** + * Test Case for {@link BalancerUtils}. + */ +public class TestBalancerUtils { + + @Test(timeout = 60000) + public void testCalculateNumStreamsToRebalance() { + String myNode = "mynode"; + + // empty load distribution + assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance( + myNode, new HashMap<String, Integer>(), 0, 10)); + // my node doesn't exist in load distribution + Map<String, Integer> loadDistribution = new HashMap<String, Integer>(); + loadDistribution.put("node1", 10); + assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance( + myNode, loadDistribution, 0, 10)); + // my node doesn't reach rebalance water mark + loadDistribution.clear(); + loadDistribution.put("node1", 1); + loadDistribution.put(myNode, 100); + assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance( + myNode, loadDistribution, 200, 10)); + // my node is below average in the cluster. + loadDistribution.clear(); + loadDistribution.put(myNode, 1); + loadDistribution.put("node1", 99); + assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance( + myNode, loadDistribution, 0, 10)); + // my node is above average in the cluster + assertEquals(49, BalancerUtils.calculateNumStreamsToRebalance( + "node1", loadDistribution, 0, 10)); + // my node is at the tolerance range + loadDistribution.clear(); + loadDistribution.put(myNode, 55); + loadDistribution.put("node1", 45); + assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance( + myNode, loadDistribution, 0, 10)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java new file mode 100644 index 0000000..fb3fb6e --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.fail; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.RateLimiter; +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.distributedlog.service.DLSocketAddress; +import org.apache.distributedlog.service.DistributedLogClient; +import org.apache.distributedlog.service.DistributedLogCluster.DLServer; +import org.apache.distributedlog.service.DistributedLogServerTestCase; +import com.twitter.util.Await; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test Case for {@link ClusterBalancer}. + */ +public class TestClusterBalancer extends DistributedLogServerTestCase { + + private static final Logger logger = LoggerFactory.getLogger(TestClusterBalancer.class); + + private final int numServers = 5; + private final List<DLServer> cluster; + private DLClient client; + + public TestClusterBalancer() { + super(true); + this.cluster = new ArrayList<DLServer>(); + } + + @Before + @Override + public void setup() throws Exception { + super.setup(); + int initPort = 9001; + for (int i = 0; i < numServers; i++) { + cluster.add(createDistributedLogServer(initPort + i)); + } + client = createDistributedLogClient("cluster_client", Optional.<String>absent()); + } + + @After + @Override + public void teardown() throws Exception { + super.teardown(); + if (null != client) { + client.shutdown(); + } + for (DLServer server: cluster) { + server.shutdown(); + } + } + + private void initStreams(String namePrefix) { + logger.info("Init streams with prefix {}", namePrefix); + // Stream Distribution: 5, 4, 3, 2, 1 + initStreams(namePrefix, 5, 1, 0); + initStreams(namePrefix, 4, 6, 1); + initStreams(namePrefix, 3, 10, 2); + initStreams(namePrefix, 2, 13, 3); + initStreams(namePrefix, 1, 15, 4); + } + + private void initStreams(String namePrefix, int numStreams, int streamId, int proxyId) { + for (int i = 0; i < numStreams; i++) { + String name = namePrefix + (streamId++); + client.routingService.addHost(name, cluster.get(proxyId).getAddress()); + } + } + + private void writeStreams(String namePrefix) throws Exception { + logger.info("Write streams with prefix {}", namePrefix); + writeStreams(namePrefix, 5, 1); + writeStreams(namePrefix, 4, 6); + writeStreams(namePrefix, 3, 10); + writeStreams(namePrefix, 2, 13); + writeStreams(namePrefix, 1, 15); + } + + private void writeStreams(String namePrefix, int numStreams, int streamId) throws Exception { + for (int i = 0; i < numStreams; i++) { + String name = namePrefix + (streamId++); + try { + Await.result(client.dlClient.write(name, ByteBuffer.wrap(name.getBytes(UTF_8)))); + } catch (Exception e) { + logger.error("Error writing stream {} : ", name, e); + throw e; + } + } + } + + private void validateStreams(String namePrefix) throws Exception { + logger.info("Validate streams with prefix {}", namePrefix); + validateStreams(namePrefix, 5, 1, 0); + validateStreams(namePrefix, 4, 6, 1); + validateStreams(namePrefix, 3, 10, 2); + validateStreams(namePrefix, 2, 13, 3); + validateStreams(namePrefix, 1, 15, 4); + } + + private void validateStreams(String namePrefix, int numStreams, int streamId, int proxyIdx) { + Set<String> expectedStreams = new HashSet<String>(); + for (int i = 0; i < numStreams; i++) { + expectedStreams.add(namePrefix + (streamId++)); + } + checkStreams(expectedStreams, cluster.get(proxyIdx)); + } + + @Ignore + @Test(timeout = 60000) + public void testBalanceAll() throws Exception { + String namePrefix = "clusterbalancer-balance-all-"; + + initStreams(namePrefix); + writeStreams(namePrefix); + validateStreams(namePrefix); + + Optional<RateLimiter> rateLimiter = Optional.absent(); + + Balancer balancer = new ClusterBalancer(client.dlClientBuilder, + Pair.of((DistributedLogClient) client.dlClient, (MonitorServiceClient) client.dlClient)); + logger.info("Rebalancing from 'unknown' target"); + try { + balancer.balanceAll("unknown", 10, rateLimiter); + fail("Should fail on balanceAll from 'unknown' target."); + } catch (IllegalArgumentException iae) { + // expected + } + validateStreams(namePrefix); + + logger.info("Rebalancing from 'unexisted' host"); + String addr = DLSocketAddress.toString(DLSocketAddress.getSocketAddress(9999)); + balancer.balanceAll(addr, 10, rateLimiter); + validateStreams(namePrefix); + + addr = DLSocketAddress.toString(cluster.get(0).getAddress()); + logger.info("Rebalancing from host {}.", addr); + balancer.balanceAll(addr, 10, rateLimiter); + checkStreams(0, cluster.get(0)); + checkStreams(4, cluster.get(1)); + checkStreams(3, cluster.get(2)); + checkStreams(4, cluster.get(3)); + checkStreams(4, cluster.get(4)); + + addr = DLSocketAddress.toString(cluster.get(2).getAddress()); + logger.info("Rebalancing from host {}.", addr); + balancer.balanceAll(addr, 10, rateLimiter); + checkStreams(3, cluster.get(0)); + checkStreams(4, cluster.get(1)); + checkStreams(0, cluster.get(2)); + checkStreams(4, cluster.get(3)); + checkStreams(4, cluster.get(4)); + + logger.info("Rebalancing the cluster"); + balancer.balance(0, 0.0f, 10, rateLimiter); + for (int i = 0; i < 5; i++) { + checkStreams(3, cluster.get(i)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java new file mode 100644 index 0000000..6734083 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.collect.Sets; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.junit.Test; + +/** + * Test Case for {@link CountBasedStreamChooser}. + */ +public class TestCountBasedStreamChooser { + + @Test(timeout = 60000) + public void testEmptyStreamDistribution() { + try { + new CountBasedStreamChooser(new HashMap<SocketAddress, Set<String>>()); + fail("Should fail constructing stream chooser if the stream distribution is empty"); + } catch (IllegalArgumentException iae) { + // expected + } + } + + @Test(timeout = 60000) + public void testMultipleHostsWithEmptyStreams() { + for (int i = 1; i <= 3; i++) { + Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>(); + int port = 1000; + for (int j = 0; j < i; j++) { + SocketAddress address = new InetSocketAddress("127.0.0.1", port + j); + streamDistribution.put(address, new HashSet<String>()); + } + + CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution); + for (int k = 0; k < i + 1; k++) { + assertNull(chooser.choose()); + } + } + } + + @Test(timeout = 60000) + public void testSingleHostWithStreams() { + for (int i = 0; i < 3; i++) { + Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>(); + + Set<String> streams = new HashSet<String>(); + for (int j = 0; j < 3; j++) { + streams.add("SingleHostStream-" + j); + } + + int port = 1000; + SocketAddress address = new InetSocketAddress("127.0.0.1", port); + streamDistribution.put(address, streams); + + for (int k = 1; k <= i; k++) { + address = new InetSocketAddress("127.0.0.1", port + k); + streamDistribution.put(address, new HashSet<String>()); + } + + Set<String> choosenStreams = new HashSet<String>(); + + CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution); + for (int l = 0; l < 3 + i + 1; l++) { + String s = chooser.choose(); + if (null != s) { + choosenStreams.add(s); + } + } + + assertEquals(streams.size(), choosenStreams.size()); + assertTrue(Sets.difference(streams, choosenStreams).immutableCopy().isEmpty()); + } + } + + @Test(timeout = 60000) + public void testHostsHaveSameNumberStreams() { + Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>(); + Set<String> allStreams = new HashSet<String>(); + + int numHosts = 3; + int numStreamsPerHost = 3; + + int port = 1000; + for (int i = 1; i <= numHosts; i++) { + SocketAddress address = new InetSocketAddress("127.0.0.1", port + i); + Set<String> streams = new HashSet<String>(); + + for (int j = 1; j <= numStreamsPerHost; j++) { + String streamName = "HostsHaveSameNumberStreams-" + i + "-" + j; + streams.add(streamName); + allStreams.add(streamName); + } + + streamDistribution.put(address, streams); + } + + Set<String> streamsChoosen = new HashSet<String>(); + CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution); + for (int i = 1; i <= numStreamsPerHost; i++) { + for (int j = 1; j <= numHosts; j++) { + String s = chooser.choose(); + assertNotNull(s); + streamsChoosen.add(s); + } + for (int j = 0; j < numHosts; j++) { + assertEquals(numStreamsPerHost - i, chooser.streamsDistribution.get(j).getRight().size()); + } + } + assertNull(chooser.choose()); + assertEquals(numHosts * numStreamsPerHost, streamsChoosen.size()); + assertTrue(Sets.difference(allStreams, streamsChoosen).isEmpty()); + } + + @Test(timeout = 60000) + public void testHostsHaveDifferentNumberStreams() { + Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>(); + Set<String> allStreams = new HashSet<String>(); + + int numHosts = 6; + int maxStreamsPerHost = 4; + + int port = 1000; + for (int i = 0; i < numHosts; i++) { + int group = i / 2; + int numStreamsThisGroup = maxStreamsPerHost - group; + + SocketAddress address = new InetSocketAddress("127.0.0.1", port + i); + Set<String> streams = new HashSet<String>(); + + for (int j = 1; j <= numStreamsThisGroup; j++) { + String streamName = "HostsHaveDifferentNumberStreams-" + i + "-" + j; + streams.add(streamName); + allStreams.add(streamName); + } + + streamDistribution.put(address, streams); + } + + Set<String> streamsChoosen = new HashSet<String>(); + CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution); + + for (int i = 0; i < allStreams.size(); i++) { + String s = chooser.choose(); + assertNotNull(s); + streamsChoosen.add(s); + } + assertNull(chooser.choose()); + assertEquals(allStreams.size(), streamsChoosen.size()); + assertTrue(Sets.difference(allStreams, streamsChoosen).isEmpty()); + } + + @Test(timeout = 60000) + public void testLimitedStreamChooser() { + Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>(); + + Set<String> streams = new HashSet<String>(); + for (int j = 0; j < 10; j++) { + streams.add("SingleHostStream-" + j); + } + + int port = 1000; + SocketAddress address = new InetSocketAddress("127.0.0.1", port); + streamDistribution.put(address, streams); + + Set<String> choosenStreams = new HashSet<String>(); + + CountBasedStreamChooser underlying = new CountBasedStreamChooser(streamDistribution); + LimitedStreamChooser chooser = LimitedStreamChooser.of(underlying, 1); + for (int l = 0; l < 10; l++) { + String s = chooser.choose(); + if (null != s) { + choosenStreams.add(s); + } + } + + assertEquals(1, choosenStreams.size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java new file mode 100644 index 0000000..73fa98a --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.RateLimiter; +import org.apache.distributedlog.service.DistributedLogCluster.DLServer; +import org.apache.distributedlog.service.DistributedLogServerTestCase; +import com.twitter.util.Await; +import java.nio.ByteBuffer; +import java.util.Set; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test Case for {@link SimpleBalancer}. + */ +public class TestSimpleBalancer extends DistributedLogServerTestCase { + + private static final Logger logger = LoggerFactory.getLogger(TestSimpleBalancer.class); + + DLClient targetClient; + DLServer targetServer; + + public TestSimpleBalancer() { + super(true); + } + + @Before + @Override + public void setup() throws Exception { + super.setup(); + targetServer = createDistributedLogServer(7003); + targetClient = createDistributedLogClient("target", Optional.<String>absent()); + } + + @After + @Override + public void teardown() throws Exception { + super.teardown(); + if (null != targetClient) { + targetClient.shutdown(); + } + if (null != targetServer) { + targetServer.shutdown(); + } + } + + @Test(timeout = 60000) + public void testBalanceAll() throws Exception { + String namePrefix = "simplebalancer-balance-all-"; + int numStreams = 10; + + for (int i = 0; i < numStreams; i++) { + String name = namePrefix + i; + // src client + dlClient.routingService.addHost(name, dlServer.getAddress()); + // target client + targetClient.routingService.addHost(name, targetServer.getAddress()); + } + + // write to multiple streams + for (int i = 0; i < numStreams; i++) { + String name = namePrefix + i; + Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8)))); + } + + // validation + for (int i = 0; i < numStreams; i++) { + String name = namePrefix + i; + checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true); + checkStream(name, targetClient, targetServer, 0, 0, 0, false, false); + } + + Optional<RateLimiter> rateLimiter = Optional.absent(); + + Balancer balancer = new SimpleBalancer("source", dlClient.dlClient, dlClient.dlClient, + "target", targetClient.dlClient, targetClient.dlClient); + logger.info("Rebalancing from 'unknown' target"); + try { + balancer.balanceAll("unknown", 10, rateLimiter); + fail("Should fail on balanceAll from 'unknown' target."); + } catch (IllegalArgumentException iae) { + // expected + } + + // nothing to balance from 'target' + logger.info("Rebalancing from 'target' target"); + balancer.balanceAll("target", 1, rateLimiter); + for (int i = 0; i < numStreams; i++) { + String name = namePrefix + i; + checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true); + checkStream(name, targetClient, targetServer, 0, 0, 0, false, false); + } + + // balance all streams from 'source' + logger.info("Rebalancing from 'source' target"); + balancer.balanceAll("source", 10, rateLimiter); + for (int i = 0; i < numStreams; i++) { + String name = namePrefix + i; + checkStream(name, targetClient, targetServer, 1, numStreams, numStreams, true, true); + checkStream(name, dlClient, dlServer, 0, 0, 0, false, false); + } + } + + @Test(timeout = 60000) + public void testBalanceStreams() throws Exception { + String namePrefix = "simplebalancer-balance-streams-"; + int numStreams = 10; + + for (int i = 0; i < numStreams; i++) { + String name = namePrefix + i; + // src client + dlClient.routingService.addHost(name, dlServer.getAddress()); + // target client + targetClient.routingService.addHost(name, targetServer.getAddress()); + } + + // write to multiple streams + for (int i = 0; i < numStreams; i++) { + String name = namePrefix + i; + Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8)))); + } + + // validation + for (int i = 0; i < numStreams; i++) { + String name = namePrefix + i; + checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true); + checkStream(name, targetClient, targetServer, 0, 0, 0, false, false); + } + + Optional<RateLimiter> rateLimiter = Optional.absent(); + + Balancer balancer = new SimpleBalancer("source", dlClient.dlClient, dlClient.dlClient, + "target", targetClient.dlClient, targetClient.dlClient); + + // balance all streams from 'source' + logger.info("Rebalancing streams between targets"); + balancer.balance(0, 0, 10, rateLimiter); + + Set<String> sourceStreams = getAllStreamsFromDistribution(getStreamOwnershipDistribution(dlClient)); + Set<String> targetStreams = getAllStreamsFromDistribution(getStreamOwnershipDistribution(targetClient)); + + assertEquals(numStreams / 2, sourceStreams.size()); + assertEquals(numStreams / 2, targetStreams.size()); + + for (String name : sourceStreams) { + checkStream(name, dlClient, dlServer, 1, numStreams / 2, numStreams / 2, true, true); + checkStream(name, targetClient, targetServer, 1, numStreams / 2, numStreams / 2, false, false); + } + + for (String name : targetStreams) { + checkStream(name, targetClient, targetServer, 1, numStreams / 2, numStreams / 2, true, true); + checkStream(name, dlClient, dlServer, 1, numStreams / 2, numStreams / 2, false, false); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java new file mode 100644 index 0000000..ce7b2c1 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertTrue; + +import com.google.common.base.Optional; +import org.apache.distributedlog.service.DistributedLogClient; +import org.apache.distributedlog.service.DistributedLogCluster.DLServer; +import org.apache.distributedlog.service.DistributedLogServerTestCase; +import com.twitter.util.Await; +import java.nio.ByteBuffer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test Case for {@link StreamMover}. + */ +public class TestStreamMover extends DistributedLogServerTestCase { + + DLClient targetClient; + DLServer targetServer; + + public TestStreamMover() { + super(true); + } + + @Before + @Override + public void setup() throws Exception { + super.setup(); + targetServer = createDistributedLogServer(7003); + targetClient = createDistributedLogClient("target", Optional.<String>absent()); + } + + @After + @Override + public void teardown() throws Exception { + super.teardown(); + if (null != targetClient) { + targetClient.shutdown(); + } + if (null != targetServer) { + targetServer.shutdown(); + } + } + + @Test(timeout = 60000) + public void testMoveStream() throws Exception { + String name = "dlserver-move-stream"; + + // src client + dlClient.routingService.addHost(name, dlServer.getAddress()); + // target client + targetClient.routingService.addHost(name, targetServer.getAddress()); + + // src client write a record to that stream + Await.result(((DistributedLogClient) dlClient.dlClient).write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); + checkStream(name, dlClient, dlServer, 1, 1, 1, true, true); + checkStream(name, targetClient, targetServer, 0, 0, 0, false, false); + + StreamMover streamMover = new StreamMoverImpl("source", dlClient.dlClient, dlClient.dlClient, + "target", targetClient.dlClient, targetClient.dlClient); + assertTrue(streamMover.moveStream(name)); + checkStream(name, dlClient, dlServer, 0, 0, 0, false, false); + checkStream(name, targetClient, targetServer, 1, 1, 1, true, true); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java new file mode 100644 index 0000000..71dfa45 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Test Case for {@link ServerConfiguration}. + */ +public class TestServerConfiguration { + + @Test(timeout = 60000, expected = IllegalArgumentException.class) + public void testUnassignedShardId() { + new ServerConfiguration().validate(); + } + + @Test(timeout = 60000) + public void testAssignedShardId() { + ServerConfiguration conf = new ServerConfiguration(); + conf.setServerShardId(100); + conf.validate(); + assertEquals(100, conf.getServerShardId()); + } + + @Test(timeout = 60000, expected = IllegalArgumentException.class) + public void testInvalidServerThreads() { + ServerConfiguration conf = new ServerConfiguration(); + conf.setServerShardId(100); + conf.setServerThreads(-1); + conf.validate(); + } + + @Test(timeout = 60000, expected = IllegalArgumentException.class) + public void testInvalidDlsnVersion() { + ServerConfiguration conf = new ServerConfiguration(); + conf.setServerShardId(100); + conf.setDlsnVersion((byte) 9999); + conf.validate(); + } + + @Test(timeout = 60000) + public void testUseHostnameAsAllocatorPoolName() { + ServerConfiguration conf = new ServerConfiguration(); + assertFalse("Should not use hostname by default", conf.isUseHostnameAsAllocatorPoolName()); + conf.setUseHostnameAsAllocatorPoolName(true); + assertTrue("Should use hostname now", conf.isUseHostnameAsAllocatorPoolName()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java new file mode 100644 index 0000000..bdbde11 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.config; + +import static org.apache.distributedlog.DistributedLogConfiguration.BKDL_RETENTION_PERIOD_IN_HOURS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.config.PropertiesWriter; +import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.junit.Test; + +/** + * Test Case for {@link StreamConfigProvider}. + */ +public class TestStreamConfigProvider { + private static final String DEFAULT_CONFIG_DIR = "conf"; + private final String defaultConfigPath; + private final ScheduledExecutorService configExecutorService; + + public TestStreamConfigProvider() throws Exception { + this.configExecutorService = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("DistributedLogService-Dyncfg-%d").build()); + PropertiesWriter writer = new PropertiesWriter(); + writer.save(); + this.defaultConfigPath = writer.getFile().getPath(); + } + + StreamConfigProvider getServiceProvider(StreamPartitionConverter converter) + throws Exception { + return getServiceProvider(converter, DEFAULT_CONFIG_DIR); + } + + StreamConfigProvider getServiceProvider( + StreamPartitionConverter converter, + String configPath, + String defaultPath) throws Exception { + return new ServiceStreamConfigProvider( + configPath, + defaultPath, + converter, + configExecutorService, + 1, + TimeUnit.SECONDS); + } + + StreamConfigProvider getServiceProvider( + StreamPartitionConverter converter, + String configPath) throws Exception { + return getServiceProvider(converter, configPath, defaultConfigPath); + } + + StreamConfigProvider getDefaultProvider(String configFile) throws Exception { + return new DefaultStreamConfigProvider(configFile, configExecutorService, 1, TimeUnit.SECONDS); + } + + StreamConfigProvider getNullProvider() throws Exception { + return new NullStreamConfigProvider(); + } + + @Test(timeout = 60000) + public void testServiceProviderWithConfigRouters() throws Exception { + getServiceProvider(new IdentityStreamPartitionConverter()); + } + + @Test(timeout = 60000) + public void testServiceProviderWithMissingConfig() throws Exception { + StreamConfigProvider provider = getServiceProvider(new IdentityStreamPartitionConverter()); + Optional<DynamicDistributedLogConfiguration> config = provider.getDynamicStreamConfig("stream1"); + assertTrue(config.isPresent()); + } + + @Test(timeout = 60000) + public void testServiceProviderWithDefaultConfigPath() throws Exception { + // Default config with property set. + PropertiesWriter writer1 = new PropertiesWriter(); + writer1.setProperty("rpsStreamAcquireServiceLimit", "191919"); + writer1.save(); + String fallbackConfPath1 = writer1.getFile().getPath(); + StreamConfigProvider provider1 = getServiceProvider(new IdentityStreamPartitionConverter(), + DEFAULT_CONFIG_DIR, fallbackConfPath1); + Optional<DynamicDistributedLogConfiguration> config1 = provider1.getDynamicStreamConfig("stream1"); + + // Empty default config. + PropertiesWriter writer2 = new PropertiesWriter(); + writer2.save(); + String fallbackConfPath2 = writer2.getFile().getPath(); + StreamConfigProvider provider2 = getServiceProvider(new IdentityStreamPartitionConverter(), + DEFAULT_CONFIG_DIR, fallbackConfPath2); + Optional<DynamicDistributedLogConfiguration> config2 = provider2.getDynamicStreamConfig("stream1"); + + assertEquals(191919, config1.get().getRpsStreamAcquireServiceLimit()); + assertEquals(-1, config2.get().getRpsStreamAcquireServiceLimit()); + } + + @Test(timeout = 60000) + public void testDefaultProvider() throws Exception { + PropertiesWriter writer = new PropertiesWriter(); + writer.setProperty(BKDL_RETENTION_PERIOD_IN_HOURS, "99"); + writer.save(); + StreamConfigProvider provider = getDefaultProvider(writer.getFile().getPath()); + Optional<DynamicDistributedLogConfiguration> config1 = provider.getDynamicStreamConfig("stream1"); + Optional<DynamicDistributedLogConfiguration> config2 = provider.getDynamicStreamConfig("stream2"); + assertTrue(config1.isPresent()); + assertTrue(config1.get() == config2.get()); + assertEquals(99, config1.get().getRetentionPeriodHours()); + } + + @Test(timeout = 60000) + public void testNullProvider() throws Exception { + StreamConfigProvider provider = getNullProvider(); + Optional<DynamicDistributedLogConfiguration> config1 = provider.getDynamicStreamConfig("stream1"); + Optional<DynamicDistributedLogConfiguration> config2 = provider.getDynamicStreamConfig("stream2"); + assertFalse(config1.isPresent()); + assertTrue(config1 == config2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java new file mode 100644 index 0000000..5f5ecd4 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.distributedlog.client.routing.RoutingService; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.util.Await; +import com.twitter.util.Duration; +import com.twitter.util.Future; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.LinkedHashSet; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test Case for {@link LeastLoadPlacementPolicy}. + */ +public class TestLeastLoadPlacementPolicy { + + @Test(timeout = 10000) + public void testCalculateBalances() throws Exception { + int numSevers = new Random().nextInt(20) + 1; + int numStreams = new Random().nextInt(200) + 1; + RoutingService mockRoutingService = mock(RoutingService.class); + DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); + LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy( + new EqualLoadAppraiser(), + mockRoutingService, + mockNamespace, + null, + Duration.fromSeconds(600), + new NullStatsLogger()); + TreeSet<ServerLoad> serverLoads = + Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams))); + long lowLoadPerServer = numStreams / numSevers; + long highLoadPerServer = lowLoadPerServer + 1; + for (ServerLoad serverLoad : serverLoads) { + long load = serverLoad.getLoad(); + assertEquals(load, serverLoad.getStreamLoads().size()); + assertTrue(String.format("Load %d is not between %d and %d", + load, lowLoadPerServer, highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer); + } + } + + @Test(timeout = 10000) + public void testRefreshAndPlaceStream() throws Exception { + int numSevers = new Random().nextInt(20) + 1; + int numStreams = new Random().nextInt(200) + 1; + RoutingService mockRoutingService = mock(RoutingService.class); + when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers)); + DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); + try { + when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator()); + } catch (IOException e) { + fail(); + } + PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class); + LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy( + new EqualLoadAppraiser(), + mockRoutingService, + mockNamespace, + mockPlacementStateManager, + Duration.fromSeconds(600), + new NullStatsLogger()); + leastLoadPlacementPolicy.refresh(); + + final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class); + verify(mockPlacementStateManager).saveOwnership(captor.capture()); + TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>) captor.getValue(); + ServerLoad next = serverLoads.first(); + String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1")); + assertEquals(next.getServer(), serverPlacement); + } + + @Test(timeout = 10000) + public void testCalculateUnequalWeight() throws Exception { + int numSevers = new Random().nextInt(20) + 1; + int numStreams = new Random().nextInt(200) + 1; + /* use AtomicInteger to have a final object in answer method */ + final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE); + RoutingService mockRoutingService = mock(RoutingService.class); + DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); + LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class); + when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() { + @Override + public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable { + int load = new Random().nextInt(100000); + if (load > maxLoad.get()) { + maxLoad.set(load); + } + return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(), load)); + } + }); + LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy( + mockLoadAppraiser, + mockRoutingService, + mockNamespace, + null, + Duration.fromSeconds(600), + new NullStatsLogger()); + TreeSet<ServerLoad> serverLoads = + Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams))); + long highestLoadSeen = Long.MIN_VALUE; + long lowestLoadSeen = Long.MAX_VALUE; + for (ServerLoad serverLoad : serverLoads) { + long load = serverLoad.getLoad(); + if (load < lowestLoadSeen) { + lowestLoadSeen = load; + } + if (load > highestLoadSeen) { + highestLoadSeen = load; + } + } + assertTrue("Unexpected placement for " + numStreams + " streams to " + + numSevers + " servers : highest load = " + highestLoadSeen + + ", lowest load = " + lowestLoadSeen + ", max stream load = " + maxLoad.get(), + highestLoadSeen - lowestLoadSeen <= maxLoad.get()); + } + + private Set<SocketAddress> generateSocketAddresses(int num) { + LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>(); + for (int i = 0; i < num; i++) { + socketAddresses.add(new InetSocketAddress(i)); + } + return socketAddresses; + } + + private Set<String> generateStreams(int num) { + LinkedHashSet<String> streams = new LinkedHashSet<String>(); + for (int i = 0; i < num; i++) { + streams.add("stream_" + i); + } + return streams; + } + + private Set<String> generateServers(int num) { + LinkedHashSet<String> servers = new LinkedHashSet<String>(); + for (int i = 0; i < num; i++) { + servers.add("server_" + i); + } + return servers; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java new file mode 100644 index 0000000..5bd234f --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import org.junit.Test; + +/** + * Test Case for {@link ServerLoad}. + */ +public class TestServerLoad { + + @Test(timeout = 60000) + public void testSerializeDeserialize() throws IOException { + final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3"); + for (int i = 0; i < 20; i++) { + serverLoad.addStream(new StreamLoad("stream-" + i, i)); + } + assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize())); + } + + @Test(timeout = 60000) + public void testGetLoad() throws IOException { + final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3"); + assertEquals(0, serverLoad.getLoad()); + serverLoad.addStream(new StreamLoad("stream-" + 1, 3)); + assertEquals(3, serverLoad.getLoad()); + serverLoad.addStream(new StreamLoad("stream-" + 2, 7)); + assertEquals(10, serverLoad.getLoad()); + serverLoad.addStream(new StreamLoad("stream-" + 3, 1)); + assertEquals(11, serverLoad.getLoad()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java new file mode 100644 index 0000000..36a6fed --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import org.junit.Test; + +/** + * Test Case for {@link StreamLoad}. + */ +public class TestStreamLoad { + + @Test(timeout = 10000) + public void testSerializeDeserialize() throws IOException { + final String streamName = "aHellaRandomStreamName"; + final int load = 1337; + final StreamLoad streamLoad = new StreamLoad(streamName, load); + assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java new file mode 100644 index 0000000..07ec5a5 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.apache.distributedlog.DistributedLogConfiguration; +import java.io.IOException; +import java.net.URI; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.curator.test.TestingServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test Case for {@link ZKPlacementStateManager}. + */ +public class TestZKPlacementStateManager { + private TestingServer zkTestServer; + private String zkServers; + private URI uri; + private ZKPlacementStateManager zkPlacementStateManager; + + @Before + public void startZookeeper() throws Exception { + zkTestServer = new TestingServer(2181); + zkServers = "127.0.0.1:2181"; + uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"); + zkPlacementStateManager = + new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE); + } + + @Test(timeout = 60000) + public void testSaveLoad() throws Exception { + TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); + zkPlacementStateManager.saveOwnership(ownerships); + SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership(); + assertEquals(ownerships, loadedOwnerships); + + ownerships.add(new ServerLoad("emptyServer")); + zkPlacementStateManager.saveOwnership(ownerships); + loadedOwnerships = zkPlacementStateManager.loadOwnership(); + assertEquals(ownerships, loadedOwnerships); + + ServerLoad sl1 = new ServerLoad("server1"); + sl1.addStream(new StreamLoad("stream1", 3)); + sl1.addStream(new StreamLoad("stream2", 4)); + ServerLoad sl2 = new ServerLoad("server2"); + sl2.addStream(new StreamLoad("stream3", 1)); + ownerships.add(sl1); + ownerships.add(sl2); + zkPlacementStateManager.saveOwnership(ownerships); + loadedOwnerships = zkPlacementStateManager.loadOwnership(); + assertEquals(ownerships, loadedOwnerships); + + loadedOwnerships.remove(sl1); + zkPlacementStateManager.saveOwnership(ownerships); + loadedOwnerships = zkPlacementStateManager.loadOwnership(); + assertEquals(ownerships, loadedOwnerships); + } + + private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc( + LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue, + int expectedNumServerLoads) throws InterruptedException { + TreeSet<ServerLoad> notification = notificationQueue.take(); + assertNotNull(notification); + while (notification.size() < expectedNumServerLoads) { + notification = notificationQueue.take(); + } + assertEquals(expectedNumServerLoads, notification.size()); + return notification; + } + + @Test(timeout = 60000) + public void testWatchIndefinitely() throws Exception { + TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); + ownerships.add(new ServerLoad("server1")); + final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications = + new LinkedBlockingQueue<TreeSet<ServerLoad>>(); + PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() { + @Override + public void callback(TreeSet<ServerLoad> serverLoads) { + serverLoadNotifications.add(serverLoads); + } + }; + zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching + zkPlacementStateManager.watch(callback); + // cannot verify the callback here as it may call before the verify is called + + zkPlacementStateManager.saveOwnership(ownerships); + assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1)); + + ServerLoad server2 = new ServerLoad("server2"); + server2.addStream(new StreamLoad("hella-important-stream", 415)); + ownerships.add(server2); + zkPlacementStateManager.saveOwnership(ownerships); + assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2)); + } + + @Test(timeout = 60000) + public void testZkFormatting() throws Exception { + final String server = "host/10.0.0.0:31351"; + final String zkFormattedServer = "host--10.0.0.0:31351"; + URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"); + ZKPlacementStateManager zkPlacementStateManager = + new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE); + assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server)); + assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer)); + } + + @After + public void stopZookeeper() throws IOException { + zkTestServer.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java new file mode 100644 index 0000000..56e9483 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.service.config.StreamConfigProvider; +import org.apache.distributedlog.service.streamset.Partition; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import com.twitter.util.Await; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * Test Case for StreamManager. + */ +public class TestStreamManager { + + @Rule + public TestName testName = new TestName(); + + ScheduledExecutorService mockExecutorService = mock(ScheduledExecutorService.class); + + @Test(timeout = 60000) + public void testCollectionMethods() throws Exception { + Stream mockStream = mock(Stream.class); + when(mockStream.getStreamName()).thenReturn("stream1"); + when(mockStream.getPartition()).thenReturn(new Partition("stream1", 0)); + StreamFactory mockStreamFactory = mock(StreamFactory.class); + StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class); + StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class); + when(mockStreamFactory.create( + (String) any(), + (DynamicDistributedLogConfiguration) any(), + (StreamManager) any())).thenReturn(mockStream); + StreamManager streamManager = new StreamManagerImpl( + "", + new DistributedLogConfiguration(), + mockExecutorService, + mockStreamFactory, + mockPartitionConverter, + mockStreamConfigProvider, + mock(DistributedLogNamespace.class)); + + assertFalse(streamManager.isAcquired("stream1")); + assertEquals(0, streamManager.numAcquired()); + assertEquals(0, streamManager.numCached()); + + streamManager.notifyAcquired(mockStream); + assertTrue(streamManager.isAcquired("stream1")); + assertEquals(1, streamManager.numAcquired()); + assertEquals(0, streamManager.numCached()); + + streamManager.notifyReleased(mockStream); + assertFalse(streamManager.isAcquired("stream1")); + assertEquals(0, streamManager.numAcquired()); + assertEquals(0, streamManager.numCached()); + + streamManager.notifyAcquired(mockStream); + assertTrue(streamManager.isAcquired("stream1")); + assertEquals(1, streamManager.numAcquired()); + assertEquals(0, streamManager.numCached()); + + streamManager.notifyAcquired(mockStream); + assertTrue(streamManager.isAcquired("stream1")); + assertEquals(1, streamManager.numAcquired()); + assertEquals(0, streamManager.numCached()); + + streamManager.notifyReleased(mockStream); + assertFalse(streamManager.isAcquired("stream1")); + assertEquals(0, streamManager.numAcquired()); + assertEquals(0, streamManager.numCached()); + + streamManager.notifyReleased(mockStream); + assertFalse(streamManager.isAcquired("stream1")); + assertEquals(0, streamManager.numAcquired()); + assertEquals(0, streamManager.numCached()); + } + + @Test(timeout = 60000) + public void testCreateStream() throws Exception { + Stream mockStream = mock(Stream.class); + final String streamName = "stream1"; + when(mockStream.getStreamName()).thenReturn(streamName); + StreamFactory mockStreamFactory = mock(StreamFactory.class); + StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class); + StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class); + when(mockStreamFactory.create( + (String) any(), + (DynamicDistributedLogConfiguration) any(), + (StreamManager) any()) + ).thenReturn(mockStream); + DistributedLogNamespace dlNamespace = mock(DistributedLogNamespace.class); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + + StreamManager streamManager = new StreamManagerImpl( + "", + new DistributedLogConfiguration(), + executorService, + mockStreamFactory, + mockPartitionConverter, + mockStreamConfigProvider, + dlNamespace); + + assertTrue(Await.ready(streamManager.createStreamAsync(streamName)).isReturn()); + verify(dlNamespace).createLog(streamName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java new file mode 100644 index 0000000..a18fda1 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.acl.DefaultAccessControlManager; +import org.apache.distributedlog.exceptions.InternalServerException; +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.service.config.ServerConfiguration; +import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; +import org.apache.distributedlog.thrift.service.StatusCode; +import org.apache.distributedlog.thrift.service.WriteResponse; +import org.apache.distributedlog.util.Sequencer; +import com.twitter.util.Await; +import com.twitter.util.Future; +import java.nio.ByteBuffer; +import org.apache.bookkeeper.feature.SettableFeature; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * Test Case for StreamOps. + */ +public class TestStreamOp { + + @Rule + public TestName testName = new TestName(); + + private WriteOp getWriteOp() { + SettableFeature disabledFeature = new SettableFeature("", 0); + return new WriteOp("test", + ByteBuffer.wrap("test".getBytes()), + new NullStatsLogger(), + new NullStatsLogger(), + new IdentityStreamPartitionConverter(), + new ServerConfiguration(), + (byte) 0, + null, + false, + disabledFeature, + DefaultAccessControlManager.INSTANCE); + } + + @Test(timeout = 60000) + public void testResponseFailedTwice() throws Exception { + WriteOp writeOp = getWriteOp(); + writeOp.fail(new InternalServerException("test1")); + writeOp.fail(new InternalServerException("test2")); + + WriteResponse response = Await.result(writeOp.result()); + assertEquals(StatusCode.INTERNAL_SERVER_ERROR, response.getHeader().getCode()); + assertEquals(ResponseUtils.exceptionToHeader(new InternalServerException("test1")), response.getHeader()); + } + + @Test(timeout = 60000) + public void testResponseSucceededThenFailed() throws Exception { + AsyncLogWriter writer = mock(AsyncLogWriter.class); + when(writer.write((LogRecord) any())).thenReturn(Future.value(new DLSN(1, 2, 3))); + when(writer.getStreamName()).thenReturn("test"); + WriteOp writeOp = getWriteOp(); + writeOp.execute(writer, new Sequencer() { + public long nextId() { + return 0; + } + }, new Object()); + writeOp.fail(new InternalServerException("test2")); + + WriteResponse response = Await.result(writeOp.result()); + assertEquals(StatusCode.SUCCESS, response.getHeader().getCode()); + } +}