http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java deleted file mode 100644 index d0a2f88..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.feature.DefaultFeatureProvider; -import org.apache.distributedlog.service.DistributedLogCluster.DLServer; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.feature.SettableFeature; -import org.apache.bookkeeper.stats.StatsLogger; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * Test Case for {@link org.apache.distributedlog.exceptions.RegionUnavailableException}. - */ -public class TestRegionUnavailable extends DistributedLogServerTestCase { - - /** - * A feature provider for testing. - */ - public static class TestFeatureProvider extends DefaultFeatureProvider { - - public TestFeatureProvider(String rootScope, - DistributedLogConfiguration conf, - StatsLogger statsLogger) { - super(rootScope, conf, statsLogger); - } - - @Override - protected Feature makeFeature(String featureName) { - if (featureName.contains(ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase())) { - return new SettableFeature(featureName, 10000); - } - return super.makeFeature(featureName); - } - - @Override - protected FeatureProvider makeProvider(String fullScopeName) { - return super.makeProvider(fullScopeName); - } - } - - private final int numServersPerDC = 3; - private final List<DLServer> localCluster; - private final List<DLServer> remoteCluster; - private TwoRegionDLClient client; - - public TestRegionUnavailable() { - super(true); - this.localCluster = new ArrayList<DLServer>(); - this.remoteCluster = new ArrayList<DLServer>(); - } - - @Before - @Override - public void setup() throws Exception { - DistributedLogConfiguration localConf = new DistributedLogConfiguration(); - localConf.addConfiguration(conf); - localConf.setFeatureProviderClass(TestFeatureProvider.class); - DistributedLogConfiguration remoteConf = new DistributedLogConfiguration(); - remoteConf.addConfiguration(conf); - super.setup(); - int localPort = 9010; - int remotePort = 9020; - for (int i = 0; i < numServersPerDC; i++) { - localCluster.add(createDistributedLogServer(localConf, localPort + i)); - remoteCluster.add(createDistributedLogServer(remoteConf, remotePort + i)); - } - Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>(); - for (DLServer server : localCluster) { - regionMap.put(server.getAddress(), "local"); - } - for (DLServer server : remoteCluster) { - regionMap.put(server.getAddress(), "remote"); - } - client = createTwoRegionDLClient("two_regions_client", regionMap); - - } - - private void registerStream(String streamName) { - for (DLServer server : localCluster) { - client.localRoutingService.addHost(streamName, server.getAddress()); - } - client.remoteRoutingService.addHost(streamName, remoteCluster.get(0).getAddress()); - } - - @After - @Override - public void teardown() throws Exception { - super.teardown(); - if (null != client) { - client.shutdown(); - } - for (DLServer server : localCluster) { - server.shutdown(); - } - for (DLServer server : remoteCluster) { - server.shutdown(); - } - } - - @Test(timeout = 60000) - public void testRegionUnavailable() throws Exception { - String name = "dlserver-region-unavailable"; - registerStream(name); - - for (long i = 1; i <= 10; i++) { - client.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); - } - - // check local region - for (DLServer server : localCluster) { - checkStreams(0, server); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java deleted file mode 100644 index c8b8bdf..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import static org.junit.Assert.assertEquals; - -import com.twitter.finagle.Service; -import com.twitter.finagle.service.ConstantService; -import com.twitter.util.Await; -import com.twitter.util.Future; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.junit.Test; - -/** - * Test Case for {@link StatsFilter}. - */ -public class TestStatsFilter { - - class RuntimeExService<Req, Rep> extends Service<Req, Rep> { - public Future<Rep> apply(Req request) { - throw new RuntimeException("test"); - } - } - - @Test(timeout = 60000) - public void testServiceSuccess() throws Exception { - StatsLogger stats = new NullStatsLogger(); - StatsFilter<String, String> filter = new StatsFilter<String, String>(stats); - Future<String> result = filter.apply("", new ConstantService<String, String>(Future.value("result"))); - assertEquals("result", Await.result(result)); - } - - @Test(timeout = 60000) - public void testServiceFailure() throws Exception { - StatsLogger stats = new NullStatsLogger(); - StatsFilter<String, String> filter = new StatsFilter<String, String>(stats); - try { - filter.apply("", new RuntimeExService<String, String>()); - } catch (RuntimeException ex) { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java deleted file mode 100644 index 21bebb5..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import static org.junit.Assert.assertEquals; - -import java.util.HashMap; -import java.util.Map; -import org.junit.Test; - -/** - * Test Case for {@link BalancerUtils}. - */ -public class TestBalancerUtils { - - @Test(timeout = 60000) - public void testCalculateNumStreamsToRebalance() { - String myNode = "mynode"; - - // empty load distribution - assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance( - myNode, new HashMap<String, Integer>(), 0, 10)); - // my node doesn't exist in load distribution - Map<String, Integer> loadDistribution = new HashMap<String, Integer>(); - loadDistribution.put("node1", 10); - assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance( - myNode, loadDistribution, 0, 10)); - // my node doesn't reach rebalance water mark - loadDistribution.clear(); - loadDistribution.put("node1", 1); - loadDistribution.put(myNode, 100); - assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance( - myNode, loadDistribution, 200, 10)); - // my node is below average in the cluster. - loadDistribution.clear(); - loadDistribution.put(myNode, 1); - loadDistribution.put("node1", 99); - assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance( - myNode, loadDistribution, 0, 10)); - // my node is above average in the cluster - assertEquals(49, BalancerUtils.calculateNumStreamsToRebalance( - "node1", loadDistribution, 0, 10)); - // my node is at the tolerance range - loadDistribution.clear(); - loadDistribution.put(myNode, 55); - loadDistribution.put("node1", 45); - assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance( - myNode, loadDistribution, 0, 10)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java deleted file mode 100644 index fb3fb6e..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.fail; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.RateLimiter; -import org.apache.distributedlog.client.monitor.MonitorServiceClient; -import org.apache.distributedlog.service.DLSocketAddress; -import org.apache.distributedlog.service.DistributedLogClient; -import org.apache.distributedlog.service.DistributedLogCluster.DLServer; -import org.apache.distributedlog.service.DistributedLogServerTestCase; -import com.twitter.util.Await; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.commons.lang3.tuple.Pair; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test Case for {@link ClusterBalancer}. - */ -public class TestClusterBalancer extends DistributedLogServerTestCase { - - private static final Logger logger = LoggerFactory.getLogger(TestClusterBalancer.class); - - private final int numServers = 5; - private final List<DLServer> cluster; - private DLClient client; - - public TestClusterBalancer() { - super(true); - this.cluster = new ArrayList<DLServer>(); - } - - @Before - @Override - public void setup() throws Exception { - super.setup(); - int initPort = 9001; - for (int i = 0; i < numServers; i++) { - cluster.add(createDistributedLogServer(initPort + i)); - } - client = createDistributedLogClient("cluster_client", Optional.<String>absent()); - } - - @After - @Override - public void teardown() throws Exception { - super.teardown(); - if (null != client) { - client.shutdown(); - } - for (DLServer server: cluster) { - server.shutdown(); - } - } - - private void initStreams(String namePrefix) { - logger.info("Init streams with prefix {}", namePrefix); - // Stream Distribution: 5, 4, 3, 2, 1 - initStreams(namePrefix, 5, 1, 0); - initStreams(namePrefix, 4, 6, 1); - initStreams(namePrefix, 3, 10, 2); - initStreams(namePrefix, 2, 13, 3); - initStreams(namePrefix, 1, 15, 4); - } - - private void initStreams(String namePrefix, int numStreams, int streamId, int proxyId) { - for (int i = 0; i < numStreams; i++) { - String name = namePrefix + (streamId++); - client.routingService.addHost(name, cluster.get(proxyId).getAddress()); - } - } - - private void writeStreams(String namePrefix) throws Exception { - logger.info("Write streams with prefix {}", namePrefix); - writeStreams(namePrefix, 5, 1); - writeStreams(namePrefix, 4, 6); - writeStreams(namePrefix, 3, 10); - writeStreams(namePrefix, 2, 13); - writeStreams(namePrefix, 1, 15); - } - - private void writeStreams(String namePrefix, int numStreams, int streamId) throws Exception { - for (int i = 0; i < numStreams; i++) { - String name = namePrefix + (streamId++); - try { - Await.result(client.dlClient.write(name, ByteBuffer.wrap(name.getBytes(UTF_8)))); - } catch (Exception e) { - logger.error("Error writing stream {} : ", name, e); - throw e; - } - } - } - - private void validateStreams(String namePrefix) throws Exception { - logger.info("Validate streams with prefix {}", namePrefix); - validateStreams(namePrefix, 5, 1, 0); - validateStreams(namePrefix, 4, 6, 1); - validateStreams(namePrefix, 3, 10, 2); - validateStreams(namePrefix, 2, 13, 3); - validateStreams(namePrefix, 1, 15, 4); - } - - private void validateStreams(String namePrefix, int numStreams, int streamId, int proxyIdx) { - Set<String> expectedStreams = new HashSet<String>(); - for (int i = 0; i < numStreams; i++) { - expectedStreams.add(namePrefix + (streamId++)); - } - checkStreams(expectedStreams, cluster.get(proxyIdx)); - } - - @Ignore - @Test(timeout = 60000) - public void testBalanceAll() throws Exception { - String namePrefix = "clusterbalancer-balance-all-"; - - initStreams(namePrefix); - writeStreams(namePrefix); - validateStreams(namePrefix); - - Optional<RateLimiter> rateLimiter = Optional.absent(); - - Balancer balancer = new ClusterBalancer(client.dlClientBuilder, - Pair.of((DistributedLogClient) client.dlClient, (MonitorServiceClient) client.dlClient)); - logger.info("Rebalancing from 'unknown' target"); - try { - balancer.balanceAll("unknown", 10, rateLimiter); - fail("Should fail on balanceAll from 'unknown' target."); - } catch (IllegalArgumentException iae) { - // expected - } - validateStreams(namePrefix); - - logger.info("Rebalancing from 'unexisted' host"); - String addr = DLSocketAddress.toString(DLSocketAddress.getSocketAddress(9999)); - balancer.balanceAll(addr, 10, rateLimiter); - validateStreams(namePrefix); - - addr = DLSocketAddress.toString(cluster.get(0).getAddress()); - logger.info("Rebalancing from host {}.", addr); - balancer.balanceAll(addr, 10, rateLimiter); - checkStreams(0, cluster.get(0)); - checkStreams(4, cluster.get(1)); - checkStreams(3, cluster.get(2)); - checkStreams(4, cluster.get(3)); - checkStreams(4, cluster.get(4)); - - addr = DLSocketAddress.toString(cluster.get(2).getAddress()); - logger.info("Rebalancing from host {}.", addr); - balancer.balanceAll(addr, 10, rateLimiter); - checkStreams(3, cluster.get(0)); - checkStreams(4, cluster.get(1)); - checkStreams(0, cluster.get(2)); - checkStreams(4, cluster.get(3)); - checkStreams(4, cluster.get(4)); - - logger.info("Rebalancing the cluster"); - balancer.balance(0, 0.0f, 10, rateLimiter); - for (int i = 0; i < 5; i++) { - checkStreams(3, cluster.get(i)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java deleted file mode 100644 index 6734083..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.google.common.collect.Sets; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.junit.Test; - -/** - * Test Case for {@link CountBasedStreamChooser}. - */ -public class TestCountBasedStreamChooser { - - @Test(timeout = 60000) - public void testEmptyStreamDistribution() { - try { - new CountBasedStreamChooser(new HashMap<SocketAddress, Set<String>>()); - fail("Should fail constructing stream chooser if the stream distribution is empty"); - } catch (IllegalArgumentException iae) { - // expected - } - } - - @Test(timeout = 60000) - public void testMultipleHostsWithEmptyStreams() { - for (int i = 1; i <= 3; i++) { - Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>(); - int port = 1000; - for (int j = 0; j < i; j++) { - SocketAddress address = new InetSocketAddress("127.0.0.1", port + j); - streamDistribution.put(address, new HashSet<String>()); - } - - CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution); - for (int k = 0; k < i + 1; k++) { - assertNull(chooser.choose()); - } - } - } - - @Test(timeout = 60000) - public void testSingleHostWithStreams() { - for (int i = 0; i < 3; i++) { - Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>(); - - Set<String> streams = new HashSet<String>(); - for (int j = 0; j < 3; j++) { - streams.add("SingleHostStream-" + j); - } - - int port = 1000; - SocketAddress address = new InetSocketAddress("127.0.0.1", port); - streamDistribution.put(address, streams); - - for (int k = 1; k <= i; k++) { - address = new InetSocketAddress("127.0.0.1", port + k); - streamDistribution.put(address, new HashSet<String>()); - } - - Set<String> choosenStreams = new HashSet<String>(); - - CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution); - for (int l = 0; l < 3 + i + 1; l++) { - String s = chooser.choose(); - if (null != s) { - choosenStreams.add(s); - } - } - - assertEquals(streams.size(), choosenStreams.size()); - assertTrue(Sets.difference(streams, choosenStreams).immutableCopy().isEmpty()); - } - } - - @Test(timeout = 60000) - public void testHostsHaveSameNumberStreams() { - Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>(); - Set<String> allStreams = new HashSet<String>(); - - int numHosts = 3; - int numStreamsPerHost = 3; - - int port = 1000; - for (int i = 1; i <= numHosts; i++) { - SocketAddress address = new InetSocketAddress("127.0.0.1", port + i); - Set<String> streams = new HashSet<String>(); - - for (int j = 1; j <= numStreamsPerHost; j++) { - String streamName = "HostsHaveSameNumberStreams-" + i + "-" + j; - streams.add(streamName); - allStreams.add(streamName); - } - - streamDistribution.put(address, streams); - } - - Set<String> streamsChoosen = new HashSet<String>(); - CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution); - for (int i = 1; i <= numStreamsPerHost; i++) { - for (int j = 1; j <= numHosts; j++) { - String s = chooser.choose(); - assertNotNull(s); - streamsChoosen.add(s); - } - for (int j = 0; j < numHosts; j++) { - assertEquals(numStreamsPerHost - i, chooser.streamsDistribution.get(j).getRight().size()); - } - } - assertNull(chooser.choose()); - assertEquals(numHosts * numStreamsPerHost, streamsChoosen.size()); - assertTrue(Sets.difference(allStreams, streamsChoosen).isEmpty()); - } - - @Test(timeout = 60000) - public void testHostsHaveDifferentNumberStreams() { - Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>(); - Set<String> allStreams = new HashSet<String>(); - - int numHosts = 6; - int maxStreamsPerHost = 4; - - int port = 1000; - for (int i = 0; i < numHosts; i++) { - int group = i / 2; - int numStreamsThisGroup = maxStreamsPerHost - group; - - SocketAddress address = new InetSocketAddress("127.0.0.1", port + i); - Set<String> streams = new HashSet<String>(); - - for (int j = 1; j <= numStreamsThisGroup; j++) { - String streamName = "HostsHaveDifferentNumberStreams-" + i + "-" + j; - streams.add(streamName); - allStreams.add(streamName); - } - - streamDistribution.put(address, streams); - } - - Set<String> streamsChoosen = new HashSet<String>(); - CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution); - - for (int i = 0; i < allStreams.size(); i++) { - String s = chooser.choose(); - assertNotNull(s); - streamsChoosen.add(s); - } - assertNull(chooser.choose()); - assertEquals(allStreams.size(), streamsChoosen.size()); - assertTrue(Sets.difference(allStreams, streamsChoosen).isEmpty()); - } - - @Test(timeout = 60000) - public void testLimitedStreamChooser() { - Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>(); - - Set<String> streams = new HashSet<String>(); - for (int j = 0; j < 10; j++) { - streams.add("SingleHostStream-" + j); - } - - int port = 1000; - SocketAddress address = new InetSocketAddress("127.0.0.1", port); - streamDistribution.put(address, streams); - - Set<String> choosenStreams = new HashSet<String>(); - - CountBasedStreamChooser underlying = new CountBasedStreamChooser(streamDistribution); - LimitedStreamChooser chooser = LimitedStreamChooser.of(underlying, 1); - for (int l = 0; l < 10; l++) { - String s = chooser.choose(); - if (null != s) { - choosenStreams.add(s); - } - } - - assertEquals(1, choosenStreams.size()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java deleted file mode 100644 index 73fa98a..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.RateLimiter; -import org.apache.distributedlog.service.DistributedLogCluster.DLServer; -import org.apache.distributedlog.service.DistributedLogServerTestCase; -import com.twitter.util.Await; -import java.nio.ByteBuffer; -import java.util.Set; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test Case for {@link SimpleBalancer}. - */ -public class TestSimpleBalancer extends DistributedLogServerTestCase { - - private static final Logger logger = LoggerFactory.getLogger(TestSimpleBalancer.class); - - DLClient targetClient; - DLServer targetServer; - - public TestSimpleBalancer() { - super(true); - } - - @Before - @Override - public void setup() throws Exception { - super.setup(); - targetServer = createDistributedLogServer(7003); - targetClient = createDistributedLogClient("target", Optional.<String>absent()); - } - - @After - @Override - public void teardown() throws Exception { - super.teardown(); - if (null != targetClient) { - targetClient.shutdown(); - } - if (null != targetServer) { - targetServer.shutdown(); - } - } - - @Test(timeout = 60000) - public void testBalanceAll() throws Exception { - String namePrefix = "simplebalancer-balance-all-"; - int numStreams = 10; - - for (int i = 0; i < numStreams; i++) { - String name = namePrefix + i; - // src client - dlClient.routingService.addHost(name, dlServer.getAddress()); - // target client - targetClient.routingService.addHost(name, targetServer.getAddress()); - } - - // write to multiple streams - for (int i = 0; i < numStreams; i++) { - String name = namePrefix + i; - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8)))); - } - - // validation - for (int i = 0; i < numStreams; i++) { - String name = namePrefix + i; - checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true); - checkStream(name, targetClient, targetServer, 0, 0, 0, false, false); - } - - Optional<RateLimiter> rateLimiter = Optional.absent(); - - Balancer balancer = new SimpleBalancer("source", dlClient.dlClient, dlClient.dlClient, - "target", targetClient.dlClient, targetClient.dlClient); - logger.info("Rebalancing from 'unknown' target"); - try { - balancer.balanceAll("unknown", 10, rateLimiter); - fail("Should fail on balanceAll from 'unknown' target."); - } catch (IllegalArgumentException iae) { - // expected - } - - // nothing to balance from 'target' - logger.info("Rebalancing from 'target' target"); - balancer.balanceAll("target", 1, rateLimiter); - for (int i = 0; i < numStreams; i++) { - String name = namePrefix + i; - checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true); - checkStream(name, targetClient, targetServer, 0, 0, 0, false, false); - } - - // balance all streams from 'source' - logger.info("Rebalancing from 'source' target"); - balancer.balanceAll("source", 10, rateLimiter); - for (int i = 0; i < numStreams; i++) { - String name = namePrefix + i; - checkStream(name, targetClient, targetServer, 1, numStreams, numStreams, true, true); - checkStream(name, dlClient, dlServer, 0, 0, 0, false, false); - } - } - - @Test(timeout = 60000) - public void testBalanceStreams() throws Exception { - String namePrefix = "simplebalancer-balance-streams-"; - int numStreams = 10; - - for (int i = 0; i < numStreams; i++) { - String name = namePrefix + i; - // src client - dlClient.routingService.addHost(name, dlServer.getAddress()); - // target client - targetClient.routingService.addHost(name, targetServer.getAddress()); - } - - // write to multiple streams - for (int i = 0; i < numStreams; i++) { - String name = namePrefix + i; - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8)))); - } - - // validation - for (int i = 0; i < numStreams; i++) { - String name = namePrefix + i; - checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true); - checkStream(name, targetClient, targetServer, 0, 0, 0, false, false); - } - - Optional<RateLimiter> rateLimiter = Optional.absent(); - - Balancer balancer = new SimpleBalancer("source", dlClient.dlClient, dlClient.dlClient, - "target", targetClient.dlClient, targetClient.dlClient); - - // balance all streams from 'source' - logger.info("Rebalancing streams between targets"); - balancer.balance(0, 0, 10, rateLimiter); - - Set<String> sourceStreams = getAllStreamsFromDistribution(getStreamOwnershipDistribution(dlClient)); - Set<String> targetStreams = getAllStreamsFromDistribution(getStreamOwnershipDistribution(targetClient)); - - assertEquals(numStreams / 2, sourceStreams.size()); - assertEquals(numStreams / 2, targetStreams.size()); - - for (String name : sourceStreams) { - checkStream(name, dlClient, dlServer, 1, numStreams / 2, numStreams / 2, true, true); - checkStream(name, targetClient, targetServer, 1, numStreams / 2, numStreams / 2, false, false); - } - - for (String name : targetStreams) { - checkStream(name, targetClient, targetServer, 1, numStreams / 2, numStreams / 2, true, true); - checkStream(name, dlClient, dlServer, 1, numStreams / 2, numStreams / 2, false, false); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java deleted file mode 100644 index ce7b2c1..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.assertTrue; - -import com.google.common.base.Optional; -import org.apache.distributedlog.service.DistributedLogClient; -import org.apache.distributedlog.service.DistributedLogCluster.DLServer; -import org.apache.distributedlog.service.DistributedLogServerTestCase; -import com.twitter.util.Await; -import java.nio.ByteBuffer; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * Test Case for {@link StreamMover}. - */ -public class TestStreamMover extends DistributedLogServerTestCase { - - DLClient targetClient; - DLServer targetServer; - - public TestStreamMover() { - super(true); - } - - @Before - @Override - public void setup() throws Exception { - super.setup(); - targetServer = createDistributedLogServer(7003); - targetClient = createDistributedLogClient("target", Optional.<String>absent()); - } - - @After - @Override - public void teardown() throws Exception { - super.teardown(); - if (null != targetClient) { - targetClient.shutdown(); - } - if (null != targetServer) { - targetServer.shutdown(); - } - } - - @Test(timeout = 60000) - public void testMoveStream() throws Exception { - String name = "dlserver-move-stream"; - - // src client - dlClient.routingService.addHost(name, dlServer.getAddress()); - // target client - targetClient.routingService.addHost(name, targetServer.getAddress()); - - // src client write a record to that stream - Await.result(((DistributedLogClient) dlClient.dlClient).write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); - checkStream(name, dlClient, dlServer, 1, 1, 1, true, true); - checkStream(name, targetClient, targetServer, 0, 0, 0, false, false); - - StreamMover streamMover = new StreamMoverImpl("source", dlClient.dlClient, dlClient.dlClient, - "target", targetClient.dlClient, targetClient.dlClient); - assertTrue(streamMover.moveStream(name)); - checkStream(name, dlClient, dlServer, 0, 0, 0, false, false); - checkStream(name, targetClient, targetServer, 1, 1, 1, true, true); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java deleted file mode 100644 index 71dfa45..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.config; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -/** - * Test Case for {@link ServerConfiguration}. - */ -public class TestServerConfiguration { - - @Test(timeout = 60000, expected = IllegalArgumentException.class) - public void testUnassignedShardId() { - new ServerConfiguration().validate(); - } - - @Test(timeout = 60000) - public void testAssignedShardId() { - ServerConfiguration conf = new ServerConfiguration(); - conf.setServerShardId(100); - conf.validate(); - assertEquals(100, conf.getServerShardId()); - } - - @Test(timeout = 60000, expected = IllegalArgumentException.class) - public void testInvalidServerThreads() { - ServerConfiguration conf = new ServerConfiguration(); - conf.setServerShardId(100); - conf.setServerThreads(-1); - conf.validate(); - } - - @Test(timeout = 60000, expected = IllegalArgumentException.class) - public void testInvalidDlsnVersion() { - ServerConfiguration conf = new ServerConfiguration(); - conf.setServerShardId(100); - conf.setDlsnVersion((byte) 9999); - conf.validate(); - } - - @Test(timeout = 60000) - public void testUseHostnameAsAllocatorPoolName() { - ServerConfiguration conf = new ServerConfiguration(); - assertFalse("Should not use hostname by default", conf.isUseHostnameAsAllocatorPoolName()); - conf.setUseHostnameAsAllocatorPoolName(true); - assertTrue("Should use hostname now", conf.isUseHostnameAsAllocatorPoolName()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java deleted file mode 100644 index bdbde11..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.config; - -import static org.apache.distributedlog.DistributedLogConfiguration.BKDL_RETENTION_PERIOD_IN_HOURS; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.config.PropertiesWriter; -import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.junit.Test; - -/** - * Test Case for {@link StreamConfigProvider}. - */ -public class TestStreamConfigProvider { - private static final String DEFAULT_CONFIG_DIR = "conf"; - private final String defaultConfigPath; - private final ScheduledExecutorService configExecutorService; - - public TestStreamConfigProvider() throws Exception { - this.configExecutorService = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder().setNameFormat("DistributedLogService-Dyncfg-%d").build()); - PropertiesWriter writer = new PropertiesWriter(); - writer.save(); - this.defaultConfigPath = writer.getFile().getPath(); - } - - StreamConfigProvider getServiceProvider(StreamPartitionConverter converter) - throws Exception { - return getServiceProvider(converter, DEFAULT_CONFIG_DIR); - } - - StreamConfigProvider getServiceProvider( - StreamPartitionConverter converter, - String configPath, - String defaultPath) throws Exception { - return new ServiceStreamConfigProvider( - configPath, - defaultPath, - converter, - configExecutorService, - 1, - TimeUnit.SECONDS); - } - - StreamConfigProvider getServiceProvider( - StreamPartitionConverter converter, - String configPath) throws Exception { - return getServiceProvider(converter, configPath, defaultConfigPath); - } - - StreamConfigProvider getDefaultProvider(String configFile) throws Exception { - return new DefaultStreamConfigProvider(configFile, configExecutorService, 1, TimeUnit.SECONDS); - } - - StreamConfigProvider getNullProvider() throws Exception { - return new NullStreamConfigProvider(); - } - - @Test(timeout = 60000) - public void testServiceProviderWithConfigRouters() throws Exception { - getServiceProvider(new IdentityStreamPartitionConverter()); - } - - @Test(timeout = 60000) - public void testServiceProviderWithMissingConfig() throws Exception { - StreamConfigProvider provider = getServiceProvider(new IdentityStreamPartitionConverter()); - Optional<DynamicDistributedLogConfiguration> config = provider.getDynamicStreamConfig("stream1"); - assertTrue(config.isPresent()); - } - - @Test(timeout = 60000) - public void testServiceProviderWithDefaultConfigPath() throws Exception { - // Default config with property set. - PropertiesWriter writer1 = new PropertiesWriter(); - writer1.setProperty("rpsStreamAcquireServiceLimit", "191919"); - writer1.save(); - String fallbackConfPath1 = writer1.getFile().getPath(); - StreamConfigProvider provider1 = getServiceProvider(new IdentityStreamPartitionConverter(), - DEFAULT_CONFIG_DIR, fallbackConfPath1); - Optional<DynamicDistributedLogConfiguration> config1 = provider1.getDynamicStreamConfig("stream1"); - - // Empty default config. - PropertiesWriter writer2 = new PropertiesWriter(); - writer2.save(); - String fallbackConfPath2 = writer2.getFile().getPath(); - StreamConfigProvider provider2 = getServiceProvider(new IdentityStreamPartitionConverter(), - DEFAULT_CONFIG_DIR, fallbackConfPath2); - Optional<DynamicDistributedLogConfiguration> config2 = provider2.getDynamicStreamConfig("stream1"); - - assertEquals(191919, config1.get().getRpsStreamAcquireServiceLimit()); - assertEquals(-1, config2.get().getRpsStreamAcquireServiceLimit()); - } - - @Test(timeout = 60000) - public void testDefaultProvider() throws Exception { - PropertiesWriter writer = new PropertiesWriter(); - writer.setProperty(BKDL_RETENTION_PERIOD_IN_HOURS, "99"); - writer.save(); - StreamConfigProvider provider = getDefaultProvider(writer.getFile().getPath()); - Optional<DynamicDistributedLogConfiguration> config1 = provider.getDynamicStreamConfig("stream1"); - Optional<DynamicDistributedLogConfiguration> config2 = provider.getDynamicStreamConfig("stream2"); - assertTrue(config1.isPresent()); - assertTrue(config1.get() == config2.get()); - assertEquals(99, config1.get().getRetentionPeriodHours()); - } - - @Test(timeout = 60000) - public void testNullProvider() throws Exception { - StreamConfigProvider provider = getNullProvider(); - Optional<DynamicDistributedLogConfiguration> config1 = provider.getDynamicStreamConfig("stream1"); - Optional<DynamicDistributedLogConfiguration> config2 = provider.getDynamicStreamConfig("stream2"); - assertFalse(config1.isPresent()); - assertTrue(config1 == config2); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java deleted file mode 100644 index 5f5ecd4..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.apache.distributedlog.client.routing.RoutingService; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.LinkedHashSet; -import java.util.Random; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Test Case for {@link LeastLoadPlacementPolicy}. - */ -public class TestLeastLoadPlacementPolicy { - - @Test(timeout = 10000) - public void testCalculateBalances() throws Exception { - int numSevers = new Random().nextInt(20) + 1; - int numStreams = new Random().nextInt(200) + 1; - RoutingService mockRoutingService = mock(RoutingService.class); - DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); - LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy( - new EqualLoadAppraiser(), - mockRoutingService, - mockNamespace, - null, - Duration.fromSeconds(600), - new NullStatsLogger()); - TreeSet<ServerLoad> serverLoads = - Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams))); - long lowLoadPerServer = numStreams / numSevers; - long highLoadPerServer = lowLoadPerServer + 1; - for (ServerLoad serverLoad : serverLoads) { - long load = serverLoad.getLoad(); - assertEquals(load, serverLoad.getStreamLoads().size()); - assertTrue(String.format("Load %d is not between %d and %d", - load, lowLoadPerServer, highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer); - } - } - - @Test(timeout = 10000) - public void testRefreshAndPlaceStream() throws Exception { - int numSevers = new Random().nextInt(20) + 1; - int numStreams = new Random().nextInt(200) + 1; - RoutingService mockRoutingService = mock(RoutingService.class); - when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers)); - DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); - try { - when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator()); - } catch (IOException e) { - fail(); - } - PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class); - LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy( - new EqualLoadAppraiser(), - mockRoutingService, - mockNamespace, - mockPlacementStateManager, - Duration.fromSeconds(600), - new NullStatsLogger()); - leastLoadPlacementPolicy.refresh(); - - final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class); - verify(mockPlacementStateManager).saveOwnership(captor.capture()); - TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>) captor.getValue(); - ServerLoad next = serverLoads.first(); - String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1")); - assertEquals(next.getServer(), serverPlacement); - } - - @Test(timeout = 10000) - public void testCalculateUnequalWeight() throws Exception { - int numSevers = new Random().nextInt(20) + 1; - int numStreams = new Random().nextInt(200) + 1; - /* use AtomicInteger to have a final object in answer method */ - final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE); - RoutingService mockRoutingService = mock(RoutingService.class); - DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); - LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class); - when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() { - @Override - public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable { - int load = new Random().nextInt(100000); - if (load > maxLoad.get()) { - maxLoad.set(load); - } - return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(), load)); - } - }); - LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy( - mockLoadAppraiser, - mockRoutingService, - mockNamespace, - null, - Duration.fromSeconds(600), - new NullStatsLogger()); - TreeSet<ServerLoad> serverLoads = - Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams))); - long highestLoadSeen = Long.MIN_VALUE; - long lowestLoadSeen = Long.MAX_VALUE; - for (ServerLoad serverLoad : serverLoads) { - long load = serverLoad.getLoad(); - if (load < lowestLoadSeen) { - lowestLoadSeen = load; - } - if (load > highestLoadSeen) { - highestLoadSeen = load; - } - } - assertTrue("Unexpected placement for " + numStreams + " streams to " - + numSevers + " servers : highest load = " + highestLoadSeen - + ", lowest load = " + lowestLoadSeen + ", max stream load = " + maxLoad.get(), - highestLoadSeen - lowestLoadSeen <= maxLoad.get()); - } - - private Set<SocketAddress> generateSocketAddresses(int num) { - LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>(); - for (int i = 0; i < num; i++) { - socketAddresses.add(new InetSocketAddress(i)); - } - return socketAddresses; - } - - private Set<String> generateStreams(int num) { - LinkedHashSet<String> streams = new LinkedHashSet<String>(); - for (int i = 0; i < num; i++) { - streams.add("stream_" + i); - } - return streams; - } - - private Set<String> generateServers(int num) { - LinkedHashSet<String> servers = new LinkedHashSet<String>(); - for (int i = 0; i < num; i++) { - servers.add("server_" + i); - } - return servers; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java deleted file mode 100644 index 5bd234f..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import org.junit.Test; - -/** - * Test Case for {@link ServerLoad}. - */ -public class TestServerLoad { - - @Test(timeout = 60000) - public void testSerializeDeserialize() throws IOException { - final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3"); - for (int i = 0; i < 20; i++) { - serverLoad.addStream(new StreamLoad("stream-" + i, i)); - } - assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize())); - } - - @Test(timeout = 60000) - public void testGetLoad() throws IOException { - final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3"); - assertEquals(0, serverLoad.getLoad()); - serverLoad.addStream(new StreamLoad("stream-" + 1, 3)); - assertEquals(3, serverLoad.getLoad()); - serverLoad.addStream(new StreamLoad("stream-" + 2, 7)); - assertEquals(10, serverLoad.getLoad()); - serverLoad.addStream(new StreamLoad("stream-" + 3, 1)); - assertEquals(11, serverLoad.getLoad()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java deleted file mode 100644 index 36a6fed..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import org.junit.Test; - -/** - * Test Case for {@link StreamLoad}. - */ -public class TestStreamLoad { - - @Test(timeout = 10000) - public void testSerializeDeserialize() throws IOException { - final String streamName = "aHellaRandomStreamName"; - final int load = 1337; - final StreamLoad streamLoad = new StreamLoad(streamName, load); - assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java deleted file mode 100644 index 07ec5a5..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import org.apache.distributedlog.DistributedLogConfiguration; -import java.io.IOException; -import java.net.URI; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.LinkedBlockingQueue; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.curator.test.TestingServer; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * Test Case for {@link ZKPlacementStateManager}. - */ -public class TestZKPlacementStateManager { - private TestingServer zkTestServer; - private String zkServers; - private URI uri; - private ZKPlacementStateManager zkPlacementStateManager; - - @Before - public void startZookeeper() throws Exception { - zkTestServer = new TestingServer(2181); - zkServers = "127.0.0.1:2181"; - uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"); - zkPlacementStateManager = - new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE); - } - - @Test(timeout = 60000) - public void testSaveLoad() throws Exception { - TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); - zkPlacementStateManager.saveOwnership(ownerships); - SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership(); - assertEquals(ownerships, loadedOwnerships); - - ownerships.add(new ServerLoad("emptyServer")); - zkPlacementStateManager.saveOwnership(ownerships); - loadedOwnerships = zkPlacementStateManager.loadOwnership(); - assertEquals(ownerships, loadedOwnerships); - - ServerLoad sl1 = new ServerLoad("server1"); - sl1.addStream(new StreamLoad("stream1", 3)); - sl1.addStream(new StreamLoad("stream2", 4)); - ServerLoad sl2 = new ServerLoad("server2"); - sl2.addStream(new StreamLoad("stream3", 1)); - ownerships.add(sl1); - ownerships.add(sl2); - zkPlacementStateManager.saveOwnership(ownerships); - loadedOwnerships = zkPlacementStateManager.loadOwnership(); - assertEquals(ownerships, loadedOwnerships); - - loadedOwnerships.remove(sl1); - zkPlacementStateManager.saveOwnership(ownerships); - loadedOwnerships = zkPlacementStateManager.loadOwnership(); - assertEquals(ownerships, loadedOwnerships); - } - - private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc( - LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue, - int expectedNumServerLoads) throws InterruptedException { - TreeSet<ServerLoad> notification = notificationQueue.take(); - assertNotNull(notification); - while (notification.size() < expectedNumServerLoads) { - notification = notificationQueue.take(); - } - assertEquals(expectedNumServerLoads, notification.size()); - return notification; - } - - @Test(timeout = 60000) - public void testWatchIndefinitely() throws Exception { - TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); - ownerships.add(new ServerLoad("server1")); - final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications = - new LinkedBlockingQueue<TreeSet<ServerLoad>>(); - PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() { - @Override - public void callback(TreeSet<ServerLoad> serverLoads) { - serverLoadNotifications.add(serverLoads); - } - }; - zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching - zkPlacementStateManager.watch(callback); - // cannot verify the callback here as it may call before the verify is called - - zkPlacementStateManager.saveOwnership(ownerships); - assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1)); - - ServerLoad server2 = new ServerLoad("server2"); - server2.addStream(new StreamLoad("hella-important-stream", 415)); - ownerships.add(server2); - zkPlacementStateManager.saveOwnership(ownerships); - assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2)); - } - - @Test(timeout = 60000) - public void testZkFormatting() throws Exception { - final String server = "host/10.0.0.0:31351"; - final String zkFormattedServer = "host--10.0.0.0:31351"; - URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"); - ZKPlacementStateManager zkPlacementStateManager = - new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE); - assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server)); - assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer)); - } - - @After - public void stopZookeeper() throws IOException { - zkTestServer.stop(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java deleted file mode 100644 index 56e9483..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.service.config.StreamConfigProvider; -import org.apache.distributedlog.service.streamset.Partition; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import com.twitter.util.Await; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -/** - * Test Case for StreamManager. - */ -public class TestStreamManager { - - @Rule - public TestName testName = new TestName(); - - ScheduledExecutorService mockExecutorService = mock(ScheduledExecutorService.class); - - @Test(timeout = 60000) - public void testCollectionMethods() throws Exception { - Stream mockStream = mock(Stream.class); - when(mockStream.getStreamName()).thenReturn("stream1"); - when(mockStream.getPartition()).thenReturn(new Partition("stream1", 0)); - StreamFactory mockStreamFactory = mock(StreamFactory.class); - StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class); - StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class); - when(mockStreamFactory.create( - (String) any(), - (DynamicDistributedLogConfiguration) any(), - (StreamManager) any())).thenReturn(mockStream); - StreamManager streamManager = new StreamManagerImpl( - "", - new DistributedLogConfiguration(), - mockExecutorService, - mockStreamFactory, - mockPartitionConverter, - mockStreamConfigProvider, - mock(DistributedLogNamespace.class)); - - assertFalse(streamManager.isAcquired("stream1")); - assertEquals(0, streamManager.numAcquired()); - assertEquals(0, streamManager.numCached()); - - streamManager.notifyAcquired(mockStream); - assertTrue(streamManager.isAcquired("stream1")); - assertEquals(1, streamManager.numAcquired()); - assertEquals(0, streamManager.numCached()); - - streamManager.notifyReleased(mockStream); - assertFalse(streamManager.isAcquired("stream1")); - assertEquals(0, streamManager.numAcquired()); - assertEquals(0, streamManager.numCached()); - - streamManager.notifyAcquired(mockStream); - assertTrue(streamManager.isAcquired("stream1")); - assertEquals(1, streamManager.numAcquired()); - assertEquals(0, streamManager.numCached()); - - streamManager.notifyAcquired(mockStream); - assertTrue(streamManager.isAcquired("stream1")); - assertEquals(1, streamManager.numAcquired()); - assertEquals(0, streamManager.numCached()); - - streamManager.notifyReleased(mockStream); - assertFalse(streamManager.isAcquired("stream1")); - assertEquals(0, streamManager.numAcquired()); - assertEquals(0, streamManager.numCached()); - - streamManager.notifyReleased(mockStream); - assertFalse(streamManager.isAcquired("stream1")); - assertEquals(0, streamManager.numAcquired()); - assertEquals(0, streamManager.numCached()); - } - - @Test(timeout = 60000) - public void testCreateStream() throws Exception { - Stream mockStream = mock(Stream.class); - final String streamName = "stream1"; - when(mockStream.getStreamName()).thenReturn(streamName); - StreamFactory mockStreamFactory = mock(StreamFactory.class); - StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class); - StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class); - when(mockStreamFactory.create( - (String) any(), - (DynamicDistributedLogConfiguration) any(), - (StreamManager) any()) - ).thenReturn(mockStream); - DistributedLogNamespace dlNamespace = mock(DistributedLogNamespace.class); - ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); - - StreamManager streamManager = new StreamManagerImpl( - "", - new DistributedLogConfiguration(), - executorService, - mockStreamFactory, - mockPartitionConverter, - mockStreamConfigProvider, - dlNamespace); - - assertTrue(Await.ready(streamManager.createStreamAsync(streamName)).isReturn()); - verify(dlNamespace).createLog(streamName); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java deleted file mode 100644 index a18fda1..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.distributedlog.AsyncLogWriter; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.LogRecord; -import org.apache.distributedlog.acl.DefaultAccessControlManager; -import org.apache.distributedlog.exceptions.InternalServerException; -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.service.config.ServerConfiguration; -import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; -import org.apache.distributedlog.thrift.service.StatusCode; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; -import com.twitter.util.Await; -import com.twitter.util.Future; -import java.nio.ByteBuffer; -import org.apache.bookkeeper.feature.SettableFeature; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -/** - * Test Case for StreamOps. - */ -public class TestStreamOp { - - @Rule - public TestName testName = new TestName(); - - private WriteOp getWriteOp() { - SettableFeature disabledFeature = new SettableFeature("", 0); - return new WriteOp("test", - ByteBuffer.wrap("test".getBytes()), - new NullStatsLogger(), - new NullStatsLogger(), - new IdentityStreamPartitionConverter(), - new ServerConfiguration(), - (byte) 0, - null, - false, - disabledFeature, - DefaultAccessControlManager.INSTANCE); - } - - @Test(timeout = 60000) - public void testResponseFailedTwice() throws Exception { - WriteOp writeOp = getWriteOp(); - writeOp.fail(new InternalServerException("test1")); - writeOp.fail(new InternalServerException("test2")); - - WriteResponse response = Await.result(writeOp.result()); - assertEquals(StatusCode.INTERNAL_SERVER_ERROR, response.getHeader().getCode()); - assertEquals(ResponseUtils.exceptionToHeader(new InternalServerException("test1")), response.getHeader()); - } - - @Test(timeout = 60000) - public void testResponseSucceededThenFailed() throws Exception { - AsyncLogWriter writer = mock(AsyncLogWriter.class); - when(writer.write((LogRecord) any())).thenReturn(Future.value(new DLSN(1, 2, 3))); - when(writer.getStreamName()).thenReturn("test"); - WriteOp writeOp = getWriteOp(); - writeOp.execute(writer, new Sequencer() { - public long nextId() { - return 0; - } - }, new Object()); - writeOp.fail(new InternalServerException("test2")); - - WriteResponse response = Await.result(writeOp.result()); - assertEquals(StatusCode.SUCCESS, response.getHeader().getCode()); - } -}