http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java deleted file mode 100644 index 58b5b2a..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java +++ /dev/null @@ -1,298 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import com.google.common.base.Optional; -import com.google.common.collect.Sets; -import org.apache.distributedlog.DLMTestUtil; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.client.DistributedLogClientImpl; -import org.apache.distributedlog.client.resolver.DefaultRegionResolver; -import org.apache.distributedlog.client.routing.LocalRoutingService; -import org.apache.distributedlog.client.routing.RegionsRoutingService; -import org.apache.distributedlog.service.DistributedLogCluster.DLServer; -import org.apache.distributedlog.service.stream.StreamManager; -import org.apache.distributedlog.service.stream.StreamManagerImpl; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.thrift.ClientId$; -import com.twitter.util.Duration; -import java.net.SocketAddress; -import java.net.URI; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -/** - * Base test case for distributedlog servers. - */ -public abstract class DistributedLogServerTestCase { - - protected static DistributedLogConfiguration conf = - new DistributedLogConfiguration().setLockTimeout(10) - .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10); - protected static DistributedLogConfiguration noAdHocConf = - new DistributedLogConfiguration().setLockTimeout(10).setCreateStreamIfNotExists(false) - .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10); - protected static DistributedLogCluster dlCluster; - protected static DistributedLogCluster noAdHocCluster; - - /** - * A distributedlog client wrapper for testing. - */ - protected static class DLClient { - public final LocalRoutingService routingService; - public DistributedLogClientBuilder dlClientBuilder; - public final DistributedLogClientImpl dlClient; - - protected DLClient(String name, - String streamNameRegex, - Optional<String> serverSideRoutingFinagleName) { - routingService = LocalRoutingService.newBuilder().build(); - dlClientBuilder = DistributedLogClientBuilder.newBuilder() - .name(name) - .clientId(ClientId$.MODULE$.apply(name)) - .routingService(routingService) - .streamNameRegex(streamNameRegex) - .handshakeWithClientInfo(true) - .clientBuilder(ClientBuilder.get() - .hostConnectionLimit(1) - .connectionTimeout(Duration.fromSeconds(1)) - .requestTimeout(Duration.fromSeconds(60))); - if (serverSideRoutingFinagleName.isPresent()) { - dlClientBuilder = - dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get()); - } - dlClient = (DistributedLogClientImpl) dlClientBuilder.build(); - } - - public void handshake() { - dlClient.handshake(); - } - - public void shutdown() { - dlClient.close(); - } - } - - /** - * A distributedlog client wrapper that talks to two regions. - */ - protected static class TwoRegionDLClient { - - public final LocalRoutingService localRoutingService; - public final LocalRoutingService remoteRoutingService; - public final DistributedLogClientBuilder dlClientBuilder; - public final DistributedLogClientImpl dlClient; - - protected TwoRegionDLClient(String name, Map<SocketAddress, String> regionMap) { - localRoutingService = new LocalRoutingService(); - remoteRoutingService = new LocalRoutingService(); - RegionsRoutingService regionsRoutingService = - RegionsRoutingService.of(new DefaultRegionResolver(regionMap), - localRoutingService, remoteRoutingService); - dlClientBuilder = DistributedLogClientBuilder.newBuilder() - .name(name) - .clientId(ClientId$.MODULE$.apply(name)) - .routingService(regionsRoutingService) - .streamNameRegex(".*") - .handshakeWithClientInfo(true) - .maxRedirects(2) - .clientBuilder(ClientBuilder.get() - .hostConnectionLimit(1) - .connectionTimeout(Duration.fromSeconds(1)) - .requestTimeout(Duration.fromSeconds(10))); - dlClient = (DistributedLogClientImpl) dlClientBuilder.build(); - } - - public void shutdown() { - dlClient.close(); - } - } - - private final boolean clientSideRouting; - protected DLServer dlServer; - protected DLClient dlClient; - protected DLServer noAdHocServer; - protected DLClient noAdHocClient; - - public static DistributedLogCluster createCluster(DistributedLogConfiguration conf) throws Exception { - return DistributedLogCluster.newBuilder() - .numBookies(3) - .shouldStartZK(true) - .zkServers("127.0.0.1") - .shouldStartProxy(false) - .dlConf(conf) - .bkConf(DLMTestUtil.loadTestBkConf()) - .build(); - } - - @BeforeClass - public static void setupCluster() throws Exception { - dlCluster = createCluster(conf); - dlCluster.start(); - } - - public void setupNoAdHocCluster() throws Exception { - noAdHocCluster = createCluster(noAdHocConf); - noAdHocCluster.start(); - noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false); - Optional<String> serverSideRoutingFinagleName = Optional.absent(); - if (!clientSideRouting) { - serverSideRoutingFinagleName = - Optional.of("inet!" + DLSocketAddress.toString(noAdHocServer.getAddress())); - } - noAdHocClient = createDistributedLogClient("no-ad-hoc-client", serverSideRoutingFinagleName); - } - - public void tearDownNoAdHocCluster() throws Exception { - if (null != noAdHocClient) { - noAdHocClient.shutdown(); - } - if (null != noAdHocServer) { - noAdHocServer.shutdown(); - } - } - - @AfterClass - public static void teardownCluster() throws Exception { - if (null != dlCluster) { - dlCluster.stop(); - } - if (null != noAdHocCluster) { - noAdHocCluster.stop(); - } - } - - protected static URI getUri() { - return dlCluster.getUri(); - } - - protected DistributedLogServerTestCase(boolean clientSideRouting) { - this.clientSideRouting = clientSideRouting; - } - - @Before - public void setup() throws Exception { - dlServer = createDistributedLogServer(7001); - Optional<String> serverSideRoutingFinagleName = Optional.absent(); - if (!clientSideRouting) { - serverSideRoutingFinagleName = - Optional.of("inet!" + DLSocketAddress.toString(dlServer.getAddress())); - } - dlClient = createDistributedLogClient("test", serverSideRoutingFinagleName); - } - - @After - public void teardown() throws Exception { - if (null != dlClient) { - dlClient.shutdown(); - } - if (null != dlServer) { - dlServer.shutdown(); - } - } - - protected DLServer createDistributedLogServer(int port) throws Exception { - return new DLServer(conf, dlCluster.getUri(), port, false); - } - - protected DLServer createDistributedLogServer(DistributedLogConfiguration conf, int port) - throws Exception { - return new DLServer(conf, dlCluster.getUri(), port, false); - } - - protected DLClient createDistributedLogClient(String clientName, - Optional<String> serverSideRoutingFinagleName) - throws Exception { - return createDistributedLogClient(clientName, ".*", serverSideRoutingFinagleName); - } - - protected DLClient createDistributedLogClient(String clientName, - String streamNameRegex, - Optional<String> serverSideRoutingFinagleName) - throws Exception { - return new DLClient(clientName, streamNameRegex, serverSideRoutingFinagleName); - } - - protected TwoRegionDLClient createTwoRegionDLClient(String clientName, - Map<SocketAddress, String> regionMap) - throws Exception { - return new TwoRegionDLClient(clientName, regionMap); - } - - protected static void checkStreams(int numExpectedStreams, DLServer dlServer) { - StreamManager streamManager = dlServer.dlServer.getKey().getStreamManager(); - assertEquals(numExpectedStreams, streamManager.numCached()); - assertEquals(numExpectedStreams, streamManager.numAcquired()); - } - - protected static void checkStreams(Set<String> streams, DLServer dlServer) { - StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager(); - Set<String> cachedStreams = streamManager.getCachedStreams().keySet(); - Set<String> acquiredStreams = streamManager.getAcquiredStreams().keySet(); - - assertEquals(streams.size(), cachedStreams.size()); - assertEquals(streams.size(), acquiredStreams.size()); - assertTrue(Sets.difference(streams, cachedStreams).isEmpty()); - assertTrue(Sets.difference(streams, acquiredStreams).isEmpty()); - } - - protected static void checkStream(String name, DLClient dlClient, DLServer dlServer, - int expectedNumProxiesInClient, int expectedClientCacheSize, - int expectedServerCacheSize, boolean existedInServer, boolean existedInClient) { - Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution(); - assertEquals(expectedNumProxiesInClient, distribution.size()); - - if (expectedNumProxiesInClient > 0) { - Map.Entry<SocketAddress, Set<String>> localEntry = - distribution.entrySet().iterator().next(); - assertEquals(dlServer.getAddress(), localEntry.getKey()); - assertEquals(expectedClientCacheSize, localEntry.getValue().size()); - assertEquals(existedInClient, localEntry.getValue().contains(name)); - } - - StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager(); - Set<String> cachedStreams = streamManager.getCachedStreams().keySet(); - Set<String> acquiredStreams = streamManager.getCachedStreams().keySet(); - - assertEquals(expectedServerCacheSize, cachedStreams.size()); - assertEquals(existedInServer, cachedStreams.contains(name)); - assertEquals(expectedServerCacheSize, acquiredStreams.size()); - assertEquals(existedInServer, acquiredStreams.contains(name)); - } - - protected static Map<SocketAddress, Set<String>> getStreamOwnershipDistribution(DLClient dlClient) { - return dlClient.dlClient.getStreamOwnershipDistribution(); - } - - protected static Set<String> getAllStreamsFromDistribution(Map<SocketAddress, Set<String>> distribution) { - Set<String> allStreams = new HashSet<String>(); - for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) { - allStreams.addAll(entry.getValue()); - } - return allStreams; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java deleted file mode 100644 index 29a3617..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java +++ /dev/null @@ -1,720 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import static com.google.common.base.Charsets.UTF_8; -import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.google.common.base.Optional; -import org.apache.distributedlog.AsyncLogReader; -import org.apache.distributedlog.DLMTestUtil; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.DistributedLogManager; -import org.apache.distributedlog.LogReader; -import org.apache.distributedlog.LogRecord; -import org.apache.distributedlog.LogRecordWithDLSN; -import org.apache.distributedlog.TestZooKeeperClientBuilder; -import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.acl.AccessControlManager; -import org.apache.distributedlog.annotations.DistributedLogAnnotations; -import org.apache.distributedlog.client.routing.LocalRoutingService; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.LogNotFoundException; -import org.apache.distributedlog.impl.acl.ZKAccessControl; -import org.apache.distributedlog.impl.metadata.BKDLConfig; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.service.stream.StreamManagerImpl; -import org.apache.distributedlog.thrift.AccessControlEntry; -import org.apache.distributedlog.thrift.service.BulkWriteResponse; -import org.apache.distributedlog.thrift.service.HeartbeatOptions; -import org.apache.distributedlog.thrift.service.StatusCode; -import org.apache.distributedlog.thrift.service.WriteContext; -import org.apache.distributedlog.util.FailpointUtils; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.thrift.ClientId$; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; -import com.twitter.util.Futures; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test Case for {@link DistributedLogServer}. - */ -public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase { - - private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class); - - @Rule - public TestName testName = new TestName(); - - protected TestDistributedLogServerBase(boolean clientSideRouting) { - super(clientSideRouting); - } - - /** - * {@link https://issues.apache.org/jira/browse/DL-27}. - */ - @DistributedLogAnnotations.FlakyTest - @Ignore - @Test(timeout = 60000) - public void testBasicWrite() throws Exception { - String name = "dlserver-basic-write"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - for (long i = 1; i <= 10; i++) { - logger.debug("Write entry {} to stream {}.", i, name); - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes()))); - } - - HeartbeatOptions hbOptions = new HeartbeatOptions(); - hbOptions.setSendHeartBeatToReader(true); - // make sure the first log segment of each stream created - FutureUtils.result(dlClient.dlClient.heartbeat(name)); - - DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader = dlm.getInputStream(1); - int numRead = 0; - LogRecord r = reader.readNext(false); - while (null != r) { - ++numRead; - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(numRead, i); - r = reader.readNext(false); - } - assertEquals(10, numRead); - reader.close(); - dlm.close(); - } - - /** - * Sanity check to make sure both checksum flag values work. - */ - @Test(timeout = 60000) - public void testChecksumFlag() throws Exception { - String name = "testChecksumFlag"; - LocalRoutingService routingService = LocalRoutingService.newBuilder().build(); - routingService.addHost(name, dlServer.getAddress()); - DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder() - .name(name) - .clientId(ClientId$.MODULE$.apply("test")) - .routingService(routingService) - .handshakeWithClientInfo(true) - .clientBuilder(ClientBuilder.get() - .hostConnectionLimit(1) - .connectionTimeout(Duration.fromSeconds(1)) - .requestTimeout(Duration.fromSeconds(60))) - .checksum(false); - DistributedLogClient dlClient = dlClientBuilder.build(); - Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes()))); - dlClient.close(); - - dlClient = dlClientBuilder.checksum(true).build(); - Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes()))); - dlClient.close(); - } - - private void runSimpleBulkWriteTest(int writeCount) throws Exception { - String name = String.format("dlserver-bulk-write-%d", writeCount); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount); - for (long i = 1; i <= writeCount; i++) { - writes.add(ByteBuffer.wrap(("" + i).getBytes())); - } - - logger.debug("Write {} entries to stream {}.", writeCount, name); - List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); - assertEquals(futures.size(), writeCount); - for (Future<DLSN> future : futures) { - // No throw == pass. - DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); - } - - DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader = dlm.getInputStream(1); - int numRead = 0; - LogRecord r = reader.readNext(false); - while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(numRead + 1, i); - ++numRead; - r = reader.readNext(false); - } - assertEquals(writeCount, numRead); - reader.close(); - dlm.close(); - } - - @Test(timeout = 60000) - public void testBulkWrite() throws Exception { - runSimpleBulkWriteTest(100); - } - - @Test(timeout = 60000) - public void testBulkWriteSingleWrite() throws Exception { - runSimpleBulkWriteTest(1); - } - - @Test(timeout = 60000) - public void testBulkWriteEmptyList() throws Exception { - String name = String.format("dlserver-bulk-write-%d", 0); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); - List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); - - assertEquals(0, futures.size()); - } - - @Test(timeout = 60000) - public void testBulkWriteNullArg() throws Exception { - - String name = String.format("dlserver-bulk-write-%s", "null"); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); - writes.add(null); - - try { - dlClient.dlClient.writeBulk(name, writes); - fail("should not have succeeded"); - } catch (NullPointerException npe) { - // expected - logger.info("Expected to catch NullPointException."); - } - } - - @Test(timeout = 60000) - public void testBulkWriteEmptyBuffer() throws Exception { - String name = String.format("dlserver-bulk-write-%s", "empty"); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); - writes.add(ByteBuffer.wrap(("").getBytes())); - writes.add(ByteBuffer.wrap(("").getBytes())); - List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); - assertEquals(2, futures.size()); - for (Future<DLSN> future : futures) { - // No throw == pass - DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); - } - } - - void failDueToWrongException(Exception ex) { - logger.info("testBulkWritePartialFailure: ", ex); - fail(String.format("failed with wrong exception %s", ex.getClass().getName())); - } - - int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) { - int failed = 0; - for (int i = start; i < finish; i++) { - Future<DLSN> future = futures.get(i); - try { - Await.result(future, Duration.fromSeconds(10)); - fail("future should have failed!"); - } catch (DLException cre) { - ++failed; - } catch (Exception ex) { - failDueToWrongException(ex); - } - } - return failed; - } - - void validateFailedAsLogRecordTooLong(Future<DLSN> future) { - try { - Await.result(future, Duration.fromSeconds(10)); - fail("should have failed"); - } catch (DLException dle) { - assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode()); - } catch (Exception ex) { - failDueToWrongException(ex); - } - } - - @Test(timeout = 60000) - public void testBulkWritePartialFailure() throws Exception { - String name = String.format("dlserver-bulk-write-%s", "partial-failure"); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - final int writeCount = 100; - - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1); - for (long i = 1; i <= writeCount; i++) { - writes.add(ByteBuffer.wrap(("" + i).getBytes())); - } - // Too big, will cause partial failure. - ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); - writes.add(buf); - for (long i = 1; i <= writeCount; i++) { - writes.add(ByteBuffer.wrap(("" + i).getBytes())); - } - - // Count succeeded. - List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); - int succeeded = 0; - for (int i = 0; i < writeCount; i++) { - Future<DLSN> future = futures.get(i); - try { - Await.result(future, Duration.fromSeconds(10)); - ++succeeded; - } catch (Exception ex) { - failDueToWrongException(ex); - } - } - - validateFailedAsLogRecordTooLong(futures.get(writeCount)); - FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1))); - assertEquals(writeCount, succeeded); - } - - @Test(timeout = 60000) - public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception { - String name = String.format("dlserver-bulk-write-%s", "first-write-failed"); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - final int writeCount = 100; - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1); - ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); - writes.add(buf); - for (long i = 1; i <= writeCount; i++) { - writes.add(ByteBuffer.wrap(("" + i).getBytes())); - } - - List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); - validateFailedAsLogRecordTooLong(futures.get(0)); - FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1))); - } - - @Test(timeout = 60000) - public void testBulkWriteTotalFailureLostLock() throws Exception { - String name = String.format("dlserver-bulk-write-%s", "lost-lock"); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - final int writeCount = 8; - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1); - ByteBuffer buf = ByteBuffer.allocate(8); - writes.add(buf); - for (long i = 1; i <= writeCount; i++) { - writes.add(ByteBuffer.wrap(("" + i).getBytes())); - } - // Warm it up with a write. - Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8))); - - // Failpoint a lost lock, make sure the failure gets promoted to an operation failure. - DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft(); - try { - FailpointUtils.setFailpoint( - FailpointUtils.FailPointName.FP_WriteInternalLostLock, - FailpointUtils.FailPointActions.FailPointAction_Default - ); - Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext()); - assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code); - } finally { - FailpointUtils.removeFailpoint( - FailpointUtils.FailPointName.FP_WriteInternalLostLock - ); - } - } - - @Test(timeout = 60000) - public void testHeartbeat() throws Exception { - String name = "dlserver-heartbeat"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - for (long i = 1; i <= 10; i++) { - logger.debug("Send heartbeat {} to stream {}.", i, name); - dlClient.dlClient.check(name).get(); - } - - logger.debug("Write entry one to stream {}.", name); - dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get(); - - Thread.sleep(1000); - - DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); - int numRead = 0; - // eid=0 => control records - // other 9 heartbeats will not trigger writing any control records. - // eid=1 => user entry - long startEntryId = 1; - LogRecordWithDLSN r = reader.readNext(false); - while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(numRead + 1, i); - assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0); - ++numRead; - ++startEntryId; - r = reader.readNext(false); - } - assertEquals(1, numRead); - } - - @Test(timeout = 60000) - public void testFenceWrite() throws Exception { - String name = "dlserver-fence-write"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - for (long i = 1; i <= 10; i++) { - logger.debug("Write entry {} to stream {}.", i, name); - dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); - } - - Thread.sleep(1000); - - logger.info("Fencing stream {}.", name); - DLMTestUtil.fenceStream(conf, getUri(), name); - logger.info("Fenced stream {}.", name); - - for (long i = 11; i <= 20; i++) { - logger.debug("Write entry {} to stream {}.", i, name); - dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); - } - - DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader = dlm.getInputStream(1); - int numRead = 0; - LogRecord r = reader.readNext(false); - while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(numRead + 1, i); - ++numRead; - r = reader.readNext(false); - } - assertEquals(20, numRead); - reader.close(); - dlm.close(); - } - - @Test(timeout = 60000) - public void testDeleteStream() throws Exception { - String name = "dlserver-delete-stream"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - long txid = 101; - for (long i = 1; i <= 10; i++) { - long curTxId = txid++; - logger.debug("Write entry {} to stream {}.", curTxId, name); - dlClient.dlClient.write(name, - ByteBuffer.wrap(("" + curTxId).getBytes())).get(); - } - - checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); - - dlClient.dlClient.delete(name).get(); - - checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); - - Thread.sleep(1000); - - DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri()); - AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN)); - try { - FutureUtils.result(reader101.readNext()); - fail("Should fail with LogNotFoundException since the stream is deleted"); - } catch (LogNotFoundException lnfe) { - // expected - } - FutureUtils.result(reader101.asyncClose()); - dlm101.close(); - - txid = 201; - for (long i = 1; i <= 10; i++) { - long curTxId = txid++; - logger.debug("Write entry {} to stream {}.", curTxId, name); - DLSN dlsn = dlClient.dlClient.write(name, - ByteBuffer.wrap(("" + curTxId).getBytes())).get(); - } - Thread.sleep(1000); - - DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader201 = dlm201.getInputStream(1); - int numRead = 0; - int curTxId = 201; - LogRecord r = reader201.readNext(false); - while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(curTxId++, i); - ++numRead; - r = reader201.readNext(false); - } - assertEquals(10, numRead); - reader201.close(); - dlm201.close(); - } - - @Test(timeout = 60000) - public void testCreateStream() throws Exception { - try { - setupNoAdHocCluster(); - final String name = "dlserver-create-stream"; - - noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress()); - assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); - assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); - - long txid = 101; - for (long i = 1; i <= 10; i++) { - long curTxId = txid++; - logger.debug("Write entry {} to stream {}.", curTxId, name); - noAdHocClient.dlClient.write(name, - ByteBuffer.wrap(("" + curTxId).getBytes())).get(); - } - - assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); - } finally { - tearDownNoAdHocCluster(); - } - } - - /** - * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing. - */ - @Test(timeout = 60000) - public void testCreateStreamTwice() throws Exception { - try { - setupNoAdHocCluster(); - final String name = "dlserver-create-stream-twice"; - - noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress()); - assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); - assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); - - long txid = 101; - for (long i = 1; i <= 10; i++) { - long curTxId = txid++; - logger.debug("Write entry {} to stream {}.", curTxId, name); - noAdHocClient.dlClient.write(name, - ByteBuffer.wrap(("" + curTxId).getBytes())).get(); - } - - assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); - - // create again - assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); - assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); - } finally { - tearDownNoAdHocCluster(); - } - } - - - - @Test(timeout = 60000) - public void testTruncateStream() throws Exception { - String name = "dlserver-truncate-stream"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - long txid = 1; - Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>(); - for (int s = 1; s <= 2; s++) { - for (long i = 1; i <= 10; i++) { - long curTxId = txid++; - logger.debug("Write entry {} to stream {}.", curTxId, name); - DLSN dlsn = dlClient.dlClient.write(name, - ByteBuffer.wrap(("" + curTxId).getBytes())).get(); - txid2DLSN.put(curTxId, dlsn); - } - if (s == 1) { - dlClient.dlClient.release(name).get(); - } - } - - DLSN dlsnToDelete = txid2DLSN.get(11L); - dlClient.dlClient.truncate(name, dlsnToDelete).get(); - - DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader = readDLM.getInputStream(1); - int numRead = 0; - int curTxId = 11; - LogRecord r = reader.readNext(false); - while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(curTxId++, i); - ++numRead; - r = reader.readNext(false); - } - assertEquals(10, numRead); - reader.close(); - readDLM.close(); - } - - @Test(timeout = 60000) - public void testRequestDenied() throws Exception { - String name = "request-denied"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - AccessControlEntry ace = new AccessControlEntry(); - ace.setDenyWrite(true); - ZooKeeperClient zkc = TestZooKeeperClientBuilder - .newBuilder() - .uri(getUri()) - .connectionTimeoutMs(60000) - .sessionTimeoutMs(60000) - .build(); - DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace(); - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri()); - String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name; - ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath); - accessControl.create(zkc); - - AccessControlManager acm = dlNamespace.createAccessControlManager(); - while (acm.allowWrite(name)) { - Thread.sleep(100); - } - - try { - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); - fail("Should fail with request denied exception"); - } catch (DLException dle) { - assertEquals(StatusCode.REQUEST_DENIED, dle.getCode()); - } - } - - @Test(timeout = 60000) - public void testNoneStreamNameRegex() throws Exception { - String streamNamePrefix = "none-stream-name-regex-"; - int numStreams = 5; - Set<String> streams = new HashSet<String>(); - - for (int i = 0; i < numStreams; i++) { - streams.add(streamNamePrefix + i); - } - testStreamNameRegex(streams, ".*", streams); - } - - @Test(timeout = 60000) - public void testStreamNameRegex() throws Exception { - String streamNamePrefix = "stream-name-regex-"; - int numStreams = 5; - Set<String> streams = new HashSet<String>(); - Set<String> expectedStreams = new HashSet<String>(); - String streamNameRegex = streamNamePrefix + "1"; - - for (int i = 0; i < numStreams; i++) { - streams.add(streamNamePrefix + i); - } - expectedStreams.add(streamNamePrefix + "1"); - - testStreamNameRegex(streams, streamNameRegex, expectedStreams); - } - - private void testStreamNameRegex(Set<String> streams, String streamNameRegex, - Set<String> expectedStreams) - throws Exception { - for (String streamName : streams) { - dlClient.routingService.addHost(streamName, dlServer.getAddress()); - Await.result(dlClient.dlClient.write(streamName, - ByteBuffer.wrap(streamName.getBytes(UTF_8)))); - } - - DLClient client = createDistributedLogClient( - "test-stream-name-regex", - streamNameRegex, - Optional.<String>absent()); - try { - client.routingService.addHost("unknown", dlServer.getAddress()); - client.handshake(); - Map<SocketAddress, Set<String>> distribution = - client.dlClient.getStreamOwnershipDistribution(); - assertEquals(1, distribution.size()); - Set<String> cachedStreams = distribution.values().iterator().next(); - assertNotNull(cachedStreams); - assertEquals(expectedStreams.size(), cachedStreams.size()); - - for (String streamName : cachedStreams) { - assertTrue(expectedStreams.contains(streamName)); - } - } finally { - client.shutdown(); - } - } - - @Test(timeout = 60000) - public void testReleaseStream() throws Exception { - String name = "dlserver-release-stream"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); - checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); - - // release the stream - Await.result(dlClient.dlClient.release(name)); - checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); - } - - protected void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize, - String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) { - Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution(); - assertEquals(expectedNumProxiesInClient, distribution.size()); - - if (expectedNumProxiesInClient > 0) { - Map.Entry<SocketAddress, Set<String>> localEntry = - distribution.entrySet().iterator().next(); - assertEquals(owner, localEntry.getKey()); - assertEquals(expectedClientCacheSize, localEntry.getValue().size()); - assertEquals(existedInClient, localEntry.getValue().contains(name)); - } - - - StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager(); - Set<String> cachedStreams = streamManager.getCachedStreams().keySet(); - Set<String> acquiredStreams = streamManager.getCachedStreams().keySet(); - - assertEquals(expectedServerCacheSize, cachedStreams.size()); - assertEquals(existedInServer, cachedStreams.contains(name)); - assertEquals(expectedServerCacheSize, acquiredStreams.size()); - assertEquals(existedInServer, acquiredStreams.contains(name)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java deleted file mode 100644 index c7ae960..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.fail; - -import com.twitter.finagle.NoBrokersAvailableException; -import com.twitter.util.Await; -import java.nio.ByteBuffer; -import org.junit.Test; - -/** - * Test the server with client side routing. - */ -public class TestDistributedLogServerClientRouting extends TestDistributedLogServerBase { - - public TestDistributedLogServerClientRouting() { - super(true); - } - - @Test(timeout = 60000) - public void testAcceptNewStream() throws Exception { - String name = "dlserver-accept-new-stream"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - dlClient.routingService.setAllowRetrySameHost(false); - - Await.result(dlClient.dlClient.setAcceptNewStream(false)); - - try { - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); - fail("Should fail because the proxy couldn't accept new stream"); - } catch (NoBrokersAvailableException nbae) { - // expected - } - checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); - - Await.result(dlServer.dlServer.getLeft().setAcceptNewStream(true)); - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); - checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java deleted file mode 100644 index 12416a3..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -/** - * Test the server with client side routing. - */ -public class TestDistributedLogServerServerRouting extends TestDistributedLogServerBase { - - public TestDistributedLogServerServerRouting() { - super(false); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java deleted file mode 100644 index e5d75c2..0000000 --- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java +++ /dev/null @@ -1,833 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.google.common.collect.Lists; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.TestDistributedLogBase; -import org.apache.distributedlog.acl.DefaultAccessControlManager; -import org.apache.distributedlog.client.routing.LocalRoutingService; -import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; -import org.apache.distributedlog.exceptions.StreamUnavailableException; -import org.apache.distributedlog.service.config.NullStreamConfigProvider; -import org.apache.distributedlog.service.config.ServerConfiguration; -import org.apache.distributedlog.service.placement.EqualLoadAppraiser; -import org.apache.distributedlog.service.stream.Stream; -import org.apache.distributedlog.service.stream.StreamImpl; -import org.apache.distributedlog.service.stream.StreamImpl.StreamStatus; -import org.apache.distributedlog.service.stream.StreamManagerImpl; -import org.apache.distributedlog.service.stream.WriteOp; -import org.apache.distributedlog.service.streamset.DelimiterStreamPartitionConverter; -import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import org.apache.distributedlog.thrift.service.HeartbeatOptions; -import org.apache.distributedlog.thrift.service.StatusCode; -import org.apache.distributedlog.thrift.service.WriteContext; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.FutureUtils; -import org.apache.distributedlog.util.ProtocolUtils; -import com.twitter.util.Await; -import com.twitter.util.Future; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.feature.SettableFeature; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.configuration.ConfigurationException; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test Case for DistributedLog Service. - */ -public class TestDistributedLogService extends TestDistributedLogBase { - - private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class); - - @Rule - public TestName testName = new TestName(); - - private ServerConfiguration serverConf; - private DistributedLogConfiguration dlConf; - private URI uri; - private final CountDownLatch latch = new CountDownLatch(1); - private DistributedLogServiceImpl service; - - @Before - @Override - public void setup() throws Exception { - super.setup(); - dlConf = new DistributedLogConfiguration(); - dlConf.addConfiguration(conf); - dlConf.setLockTimeout(0) - .setOutputBufferSize(0) - .setPeriodicFlushFrequencyMilliSeconds(10) - .setSchedulerShutdownTimeoutMs(100); - serverConf = newLocalServerConf(); - uri = createDLMURI("/" + testName.getMethodName()); - ensureURICreated(uri); - service = createService(serverConf, dlConf, latch); - } - - @After - @Override - public void teardown() throws Exception { - if (null != service) { - service.shutdown(); - } - super.teardown(); - } - - private DistributedLogConfiguration newLocalConf() { - DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); - confLocal.addConfiguration(dlConf); - return confLocal; - } - - private ServerConfiguration newLocalServerConf() { - ServerConfiguration serverConf = new ServerConfiguration(); - serverConf.loadConf(dlConf); - serverConf.setServerThreads(1); - return serverConf; - } - - private DistributedLogServiceImpl createService( - ServerConfiguration serverConf, - DistributedLogConfiguration dlConf) throws Exception { - return createService(serverConf, dlConf, new CountDownLatch(1)); - } - - private DistributedLogServiceImpl createService( - ServerConfiguration serverConf, - DistributedLogConfiguration dlConf, - CountDownLatch latch) throws Exception { - // Build the stream partition converter - StreamPartitionConverter converter; - try { - converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass()); - } catch (ConfigurationException e) { - logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}", - IdentityStreamPartitionConverter.class.getName()); - converter = new IdentityStreamPartitionConverter(); - } - return new DistributedLogServiceImpl( - serverConf, - dlConf, - ConfUtils.getConstDynConf(dlConf), - new NullStreamConfigProvider(), - uri, - converter, - new LocalRoutingService(), - NullStatsLogger.INSTANCE, - NullStatsLogger.INSTANCE, - latch, - new EqualLoadAppraiser()); - } - - private StreamImpl createUnstartedStream(DistributedLogServiceImpl service, - String name) throws Exception { - StreamImpl stream = (StreamImpl) service.newStream(name); - stream.initialize(); - return stream; - } - - private ByteBuffer createRecord(long txid) { - return ByteBuffer.wrap(("record-" + txid).getBytes(UTF_8)); - } - - private WriteOp createWriteOp(DistributedLogServiceImpl service, - String streamName, - long txid) { - ByteBuffer data = createRecord(txid); - return service.newWriteOp(streamName, data, null); - } - - @Test(timeout = 60000) - public void testAcquireStreams() throws Exception { - String streamName = testName.getMethodName(); - StreamImpl s0 = createUnstartedStream(service, streamName); - ServerConfiguration serverConf1 = new ServerConfiguration(); - serverConf1.addConfiguration(serverConf); - serverConf1.setServerPort(9999); - DistributedLogServiceImpl service1 = createService(serverConf1, dlConf); - StreamImpl s1 = createUnstartedStream(service1, streamName); - - // create write ops - WriteOp op0 = createWriteOp(service, streamName, 0L); - s0.submit(op0); - - WriteOp op1 = createWriteOp(service1, streamName, 1L); - s1.submit(op1); - - // check pending size - assertEquals("Write Op 0 should be pending in service 0", - 1, s0.numPendingOps()); - assertEquals("Write Op 1 should be pending in service 1", - 1, s1.numPendingOps()); - - // start acquiring s0 - s0.start(); - WriteResponse wr0 = Await.result(op0.result()); - assertEquals("Op 0 should succeed", - StatusCode.SUCCESS, wr0.getHeader().getCode()); - assertEquals("Service 0 should acquire stream", - StreamStatus.INITIALIZED, s0.getStatus()); - assertNotNull(s0.getManager()); - assertNotNull(s0.getWriter()); - assertNull(s0.getLastException()); - - // start acquiring s1 - s1.start(); - WriteResponse wr1 = Await.result(op1.result()); - assertEquals("Op 1 should fail", - StatusCode.FOUND, wr1.getHeader().getCode()); - // the stream will be set to ERROR and then be closed. - assertTrue("Service 1 should be in unavailable state", - StreamStatus.isUnavailable(s1.getStatus())); - assertNotNull(s1.getManager()); - assertNull(s1.getWriter()); - assertNotNull(s1.getLastException()); - assertTrue(s1.getLastException() instanceof OwnershipAcquireFailedException); - - service1.shutdown(); - } - - @Test(timeout = 60000) - public void testAcquireStreamsWhenExceedMaxCachedPartitions() throws Exception { - String streamName = testName.getMethodName() + "_0000"; - - DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); - confLocal.addConfiguration(dlConf); - confLocal.setMaxCachedPartitionsPerProxy(1); - - ServerConfiguration serverConfLocal = new ServerConfiguration(); - serverConfLocal.addConfiguration(serverConf); - serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class); - - DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal); - Stream stream = serviceLocal.getLogWriter(streamName); - - // stream is cached - assertNotNull(stream); - assertEquals(1, serviceLocal.getStreamManager().numCached()); - - // create write ops - WriteOp op0 = createWriteOp(service, streamName, 0L); - stream.submit(op0); - WriteResponse wr0 = Await.result(op0.result()); - assertEquals("Op 0 should succeed", - StatusCode.SUCCESS, wr0.getHeader().getCode()); - assertEquals(1, serviceLocal.getStreamManager().numAcquired()); - - // should fail to acquire another partition - try { - serviceLocal.getLogWriter(testName.getMethodName() + "_0001"); - fail("Should fail to acquire new streams"); - } catch (StreamUnavailableException sue) { - // expected - } - assertEquals(1, serviceLocal.getStreamManager().numCached()); - assertEquals(1, serviceLocal.getStreamManager().numAcquired()); - - // should be able to acquire partitions from other streams - String anotherStreamName = testName.getMethodName() + "-another_0001"; - Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName); - assertNotNull(anotherStream); - assertEquals(2, serviceLocal.getStreamManager().numCached()); - - // create write ops - WriteOp op1 = createWriteOp(service, anotherStreamName, 0L); - anotherStream.submit(op1); - WriteResponse wr1 = Await.result(op1.result()); - assertEquals("Op 1 should succeed", - StatusCode.SUCCESS, wr1.getHeader().getCode()); - assertEquals(2, serviceLocal.getStreamManager().numAcquired()); - } - - @Test(timeout = 60000) - public void testAcquireStreamsWhenExceedMaxAcquiredPartitions() throws Exception { - String streamName = testName.getMethodName() + "_0000"; - - DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); - confLocal.addConfiguration(dlConf); - confLocal.setMaxCachedPartitionsPerProxy(-1); - confLocal.setMaxAcquiredPartitionsPerProxy(1); - - ServerConfiguration serverConfLocal = new ServerConfiguration(); - serverConfLocal.addConfiguration(serverConf); - serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class); - - DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal); - Stream stream = serviceLocal.getLogWriter(streamName); - - // stream is cached - assertNotNull(stream); - assertEquals(1, serviceLocal.getStreamManager().numCached()); - - // create write ops - WriteOp op0 = createWriteOp(service, streamName, 0L); - stream.submit(op0); - WriteResponse wr0 = Await.result(op0.result()); - assertEquals("Op 0 should succeed", - StatusCode.SUCCESS, wr0.getHeader().getCode()); - assertEquals(1, serviceLocal.getStreamManager().numAcquired()); - - // should be able to cache partitions from same stream - String anotherStreamName = testName.getMethodName() + "_0001"; - Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName); - assertNotNull(anotherStream); - assertEquals(2, serviceLocal.getStreamManager().numCached()); - - // create write ops - WriteOp op1 = createWriteOp(service, anotherStreamName, 0L); - anotherStream.submit(op1); - WriteResponse wr1 = Await.result(op1.result()); - assertEquals("Op 1 should fail", - StatusCode.STREAM_UNAVAILABLE, wr1.getHeader().getCode()); - assertEquals(1, serviceLocal.getStreamManager().numAcquired()); - } - - @Test(timeout = 60000) - public void testCloseShouldErrorOutPendingOps() throws Exception { - String streamName = testName.getMethodName(); - StreamImpl s = createUnstartedStream(service, streamName); - - int numWrites = 10; - List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites); - for (int i = 0; i < numWrites; i++) { - WriteOp op = createWriteOp(service, streamName, i); - s.submit(op); - futureList.add(op.result()); - } - assertEquals(numWrites, s.numPendingOps()); - Await.result(s.requestClose("close stream")); - assertEquals("Stream " + streamName + " is set to " + StreamStatus.CLOSED, - StreamStatus.CLOSED, s.getStatus()); - for (int i = 0; i < numWrites; i++) { - Future<WriteResponse> future = futureList.get(i); - WriteResponse wr = Await.result(future); - assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE, - StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode()); - } - } - - @Test(timeout = 60000) - public void testCloseTwice() throws Exception { - String streamName = testName.getMethodName(); - StreamImpl s = createUnstartedStream(service, streamName); - - int numWrites = 10; - List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites); - for (int i = 0; i < numWrites; i++) { - WriteOp op = createWriteOp(service, streamName, i); - s.submit(op); - futureList.add(op.result()); - } - assertEquals(numWrites, s.numPendingOps()); - - Future<Void> closeFuture0 = s.requestClose("close 0"); - assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, - StreamStatus.CLOSING == s.getStatus() - || StreamStatus.CLOSED == s.getStatus()); - Future<Void> closeFuture1 = s.requestClose("close 1"); - assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, - StreamStatus.CLOSING == s.getStatus() - || StreamStatus.CLOSED == s.getStatus()); - - Await.result(closeFuture0); - assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED, - StreamStatus.CLOSED, s.getStatus()); - Await.result(closeFuture1); - assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED, - StreamStatus.CLOSED, s.getStatus()); - - for (int i = 0; i < numWrites; i++) { - Future<WriteResponse> future = futureList.get(i); - WriteResponse wr = Await.result(future); - assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE, - StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode()); - } - } - - @Test(timeout = 60000) - public void testFailRequestsDuringClosing() throws Exception { - String streamName = testName.getMethodName(); - StreamImpl s = createUnstartedStream(service, streamName); - - Future<Void> closeFuture = s.requestClose("close"); - assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, - StreamStatus.CLOSING == s.getStatus() - || StreamStatus.CLOSED == s.getStatus()); - WriteOp op1 = createWriteOp(service, streamName, 0L); - s.submit(op1); - WriteResponse response1 = Await.result(op1.result()); - assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closing", - StatusCode.STREAM_UNAVAILABLE, response1.getHeader().getCode()); - - Await.result(closeFuture); - assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED, - StreamStatus.CLOSED, s.getStatus()); - WriteOp op2 = createWriteOp(service, streamName, 1L); - s.submit(op2); - WriteResponse response2 = Await.result(op2.result()); - assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closed", - StatusCode.STREAM_UNAVAILABLE, response2.getHeader().getCode()); - } - - @Test(timeout = 60000) - public void testServiceTimeout() throws Exception { - DistributedLogConfiguration confLocal = newLocalConf(); - confLocal.setOutputBufferSize(Integer.MAX_VALUE) - .setImmediateFlushEnabled(false) - .setPeriodicFlushFrequencyMilliSeconds(0); - ServerConfiguration serverConfLocal = newLocalServerConf(); - serverConfLocal.addConfiguration(serverConf); - serverConfLocal.setServiceTimeoutMs(200) - .setStreamProbationTimeoutMs(100); - String streamName = testName.getMethodName(); - // create a new service with 200ms timeout - DistributedLogServiceImpl localService = createService(serverConfLocal, confLocal); - StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); - - int numWrites = 10; - List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites); - for (int i = 0; i < numWrites; i++) { - futureList.add(localService.write(streamName, createRecord(i))); - } - - assertTrue("Stream " + streamName + " should be cached", - streamManager.getCachedStreams().containsKey(streamName)); - - StreamImpl s = (StreamImpl) streamManager.getCachedStreams().get(streamName); - // the stream should be set CLOSING - while (StreamStatus.CLOSING != s.getStatus() - && StreamStatus.CLOSED != s.getStatus()) { - TimeUnit.MILLISECONDS.sleep(20); - } - assertNotNull("Writer should be initialized", s.getWriter()); - assertNull("No exception should be thrown", s.getLastException()); - Future<Void> closeFuture = s.getCloseFuture(); - Await.result(closeFuture); - for (int i = 0; i < numWrites; i++) { - assertTrue("Write should not fail before closing", - futureList.get(i).isDefined()); - WriteResponse response = Await.result(futureList.get(i)); - assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION, - StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() - || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() - || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); - } - - while (streamManager.getCachedStreams().containsKey(streamName)) { - TimeUnit.MILLISECONDS.sleep(20); - } - - assertFalse("Stream should be removed from cache", - streamManager.getCachedStreams().containsKey(streamName)); - assertFalse("Stream should be removed from acquired cache", - streamManager.getAcquiredStreams().containsKey(streamName)); - - localService.shutdown(); - } - - private DistributedLogServiceImpl createConfiguredLocalService() throws Exception { - DistributedLogConfiguration confLocal = newLocalConf(); - confLocal.setOutputBufferSize(0) - .setImmediateFlushEnabled(true) - .setPeriodicFlushFrequencyMilliSeconds(0); - return createService(serverConf, confLocal); - } - - private ByteBuffer getTestDataBuffer() { - return ByteBuffer.wrap("test-data".getBytes()); - } - - @Test(timeout = 60000) - public void testNonDurableWrite() throws Exception { - DistributedLogConfiguration confLocal = newLocalConf(); - confLocal.setOutputBufferSize(Integer.MAX_VALUE) - .setImmediateFlushEnabled(false) - .setPeriodicFlushFrequencyMilliSeconds(0) - .setDurableWriteEnabled(false); - ServerConfiguration serverConfLocal = new ServerConfiguration(); - serverConfLocal.addConfiguration(serverConf); - serverConfLocal.enableDurableWrite(false); - serverConfLocal.setServiceTimeoutMs(Integer.MAX_VALUE) - .setStreamProbationTimeoutMs(Integer.MAX_VALUE); - String streamName = testName.getMethodName(); - DistributedLogServiceImpl localService = - createService(serverConfLocal, confLocal); - StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); - - int numWrites = 10; - List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(); - for (int i = 0; i < numWrites; i++) { - futureList.add(localService.write(streamName, createRecord(i))); - } - assertTrue("Stream " + streamName + " should be cached", - streamManager.getCachedStreams().containsKey(streamName)); - List<WriteResponse> resultList = FutureUtils.result(Future.collect(futureList)); - for (WriteResponse wr : resultList) { - assertEquals(DLSN.InvalidDLSN, DLSN.deserialize(wr.getDlsn())); - } - - localService.shutdown(); - } - - @Test(timeout = 60000) - public void testWriteOpNoChecksum() throws Exception { - DistributedLogServiceImpl localService = createConfiguredLocalService(); - WriteContext ctx = new WriteContext(); - Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx); - WriteResponse resp = Await.result(result); - assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); - localService.shutdown(); - } - - @Test(timeout = 60000) - public void testTruncateOpNoChecksum() throws Exception { - DistributedLogServiceImpl localService = createConfiguredLocalService(); - WriteContext ctx = new WriteContext(); - Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx); - WriteResponse resp = Await.result(result); - assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); - localService.shutdown(); - } - - @Test(timeout = 60000) - public void testStreamOpNoChecksum() throws Exception { - DistributedLogServiceImpl localService = createConfiguredLocalService(); - WriteContext ctx = new WriteContext(); - HeartbeatOptions option = new HeartbeatOptions(); - option.setSendHeartBeatToReader(true); - - // hearbeat to acquire the stream and then release the stream - Future<WriteResponse> result = localService.heartbeatWithOptions("test", ctx, option); - WriteResponse resp = Await.result(result); - assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); - result = localService.release("test", ctx); - resp = Await.result(result); - assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); - - // heartbeat to acquire the stream and then delete the stream - result = localService.heartbeatWithOptions("test", ctx, option); - resp = Await.result(result); - assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); - result = localService.delete("test", ctx); - resp = Await.result(result); - assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); - - // shutdown the local service - localService.shutdown(); - } - - @Test(timeout = 60000) - public void testWriteOpChecksumBadChecksum() throws Exception { - DistributedLogServiceImpl localService = createConfiguredLocalService(); - WriteContext ctx = new WriteContext().setCrc32(999); - Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx); - WriteResponse resp = Await.result(result); - assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); - localService.shutdown(); - } - - @Test(timeout = 60000) - public void testWriteOpChecksumBadStream() throws Exception { - DistributedLogServiceImpl localService = createConfiguredLocalService(); - WriteContext ctx = new WriteContext().setCrc32( - ProtocolUtils.writeOpCRC32("test", getTestDataBuffer().array())); - Future<WriteResponse> result = localService.writeWithContext("test1", getTestDataBuffer(), ctx); - WriteResponse resp = Await.result(result); - assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); - localService.shutdown(); - } - - @Test(timeout = 60000) - public void testWriteOpChecksumBadData() throws Exception { - DistributedLogServiceImpl localService = createConfiguredLocalService(); - ByteBuffer buffer = getTestDataBuffer(); - WriteContext ctx = new WriteContext().setCrc32( - ProtocolUtils.writeOpCRC32("test", buffer.array())); - - // Overwrite 1 byte to corrupt data. - buffer.put(1, (byte) 0xab); - Future<WriteResponse> result = localService.writeWithContext("test", buffer, ctx); - WriteResponse resp = Await.result(result); - assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); - localService.shutdown(); - } - - @Test(timeout = 60000) - public void testStreamOpChecksumBadChecksum() throws Exception { - DistributedLogServiceImpl localService = createConfiguredLocalService(); - WriteContext ctx = new WriteContext().setCrc32(999); - Future<WriteResponse> result = localService.heartbeat("test", ctx); - WriteResponse resp = Await.result(result); - assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); - result = localService.release("test", ctx); - resp = Await.result(result); - assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); - result = localService.delete("test", ctx); - resp = Await.result(result); - assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); - localService.shutdown(); - } - - @Test(timeout = 60000) - public void testTruncateOpChecksumBadChecksum() throws Exception { - DistributedLogServiceImpl localService = createConfiguredLocalService(); - WriteContext ctx = new WriteContext().setCrc32(999); - Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx); - WriteResponse resp = Await.result(result); - assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); - localService.shutdown(); - } - - private WriteOp getWriteOp(String name, SettableFeature disabledFeature, Long checksum) { - return new WriteOp(name, - ByteBuffer.wrap("test".getBytes()), - new NullStatsLogger(), - new NullStatsLogger(), - new IdentityStreamPartitionConverter(), - new ServerConfiguration(), - (byte) 0, - checksum, - false, - disabledFeature, - DefaultAccessControlManager.INSTANCE); - } - - @Test(timeout = 60000) - public void testStreamOpBadChecksumWithChecksumDisabled() throws Exception { - String streamName = testName.getMethodName(); - - SettableFeature disabledFeature = new SettableFeature("", 0); - - WriteOp writeOp0 = getWriteOp(streamName, disabledFeature, 919191L); - WriteOp writeOp1 = getWriteOp(streamName, disabledFeature, 919191L); - - try { - writeOp0.preExecute(); - fail("should have thrown"); - } catch (Exception ex) { - } - - disabledFeature.set(1); - writeOp1.preExecute(); - } - - @Test(timeout = 60000) - public void testStreamOpGoodChecksumWithChecksumDisabled() throws Exception { - String streamName = testName.getMethodName(); - - SettableFeature disabledFeature = new SettableFeature("", 1); - WriteOp writeOp0 = getWriteOp( - streamName, - disabledFeature, - ProtocolUtils.writeOpCRC32(streamName, "test".getBytes())); - WriteOp writeOp1 = getWriteOp( - streamName, - disabledFeature, - ProtocolUtils.writeOpCRC32(streamName, "test".getBytes())); - - writeOp0.preExecute(); - disabledFeature.set(0); - writeOp1.preExecute(); - } - - @Test(timeout = 60000) - public void testCloseStreamsShouldFlush() throws Exception { - DistributedLogConfiguration confLocal = newLocalConf(); - confLocal.setOutputBufferSize(Integer.MAX_VALUE) - .setImmediateFlushEnabled(false) - .setPeriodicFlushFrequencyMilliSeconds(0); - - String streamNamePrefix = testName.getMethodName(); - DistributedLogServiceImpl localService = createService(serverConf, confLocal); - StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); - - int numStreams = 10; - int numWrites = 10; - List<Future<WriteResponse>> futureList = - Lists.newArrayListWithExpectedSize(numStreams * numWrites); - for (int i = 0; i < numStreams; i++) { - String streamName = streamNamePrefix + "-" + i; - HeartbeatOptions hbOptions = new HeartbeatOptions(); - hbOptions.setSendHeartBeatToReader(true); - // make sure the first log segment of each stream created - FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions)); - for (int j = 0; j < numWrites; j++) { - futureList.add(localService.write(streamName, createRecord(i * numWrites + j))); - } - } - - assertEquals("There should be " + numStreams + " streams in cache", - numStreams, streamManager.getCachedStreams().size()); - while (streamManager.getAcquiredStreams().size() < numStreams) { - TimeUnit.MILLISECONDS.sleep(20); - } - - Future<List<Void>> closeResult = localService.closeStreams(); - List<Void> closedStreams = Await.result(closeResult); - assertEquals("There should be " + numStreams + " streams closed", - numStreams, closedStreams.size()); - // all writes should be flushed - for (Future<WriteResponse> future : futureList) { - WriteResponse response = Await.result(future); - assertTrue("Op should succeed or be rejected : " + response.getHeader().getCode(), - StatusCode.SUCCESS == response.getHeader().getCode() - || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() - || StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode()); - } - assertTrue("There should be no streams in the cache", - streamManager.getCachedStreams().isEmpty()); - assertTrue("There should be no streams in the acquired cache", - streamManager.getAcquiredStreams().isEmpty()); - - localService.shutdown(); - } - - @Test(timeout = 60000) - public void testCloseStreamsShouldAbort() throws Exception { - DistributedLogConfiguration confLocal = newLocalConf(); - confLocal.setOutputBufferSize(Integer.MAX_VALUE) - .setImmediateFlushEnabled(false) - .setPeriodicFlushFrequencyMilliSeconds(0); - - String streamNamePrefix = testName.getMethodName(); - DistributedLogServiceImpl localService = createService(serverConf, confLocal); - StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); - - int numStreams = 10; - int numWrites = 10; - List<Future<WriteResponse>> futureList = - Lists.newArrayListWithExpectedSize(numStreams * numWrites); - for (int i = 0; i < numStreams; i++) { - String streamName = streamNamePrefix + "-" + i; - HeartbeatOptions hbOptions = new HeartbeatOptions(); - hbOptions.setSendHeartBeatToReader(true); - // make sure the first log segment of each stream created - FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions)); - for (int j = 0; j < numWrites; j++) { - futureList.add(localService.write(streamName, createRecord(i * numWrites + j))); - } - } - - assertEquals("There should be " + numStreams + " streams in cache", - numStreams, streamManager.getCachedStreams().size()); - while (streamManager.getAcquiredStreams().size() < numStreams) { - TimeUnit.MILLISECONDS.sleep(20); - } - - for (Stream s : streamManager.getAcquiredStreams().values()) { - StreamImpl stream = (StreamImpl) s; - stream.setStatus(StreamStatus.ERROR); - } - - Future<List<Void>> closeResult = localService.closeStreams(); - List<Void> closedStreams = Await.result(closeResult); - assertEquals("There should be " + numStreams + " streams closed", - numStreams, closedStreams.size()); - // all writes should be flushed - for (Future<WriteResponse> future : futureList) { - WriteResponse response = Await.result(future); - assertTrue("Op should fail with " + StatusCode.BK_TRANSMIT_ERROR + " or be rejected : " - + response.getHeader().getCode(), - StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() - || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() - || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); - } - // acquired streams should all been removed after we close them - assertTrue("There should be no streams in the acquired cache", - streamManager.getAcquiredStreams().isEmpty()); - localService.shutdown(); - // cached streams wouldn't be removed immediately after streams are closed - // but they should be removed after we shutdown the service - assertTrue("There should be no streams in the cache after shutting down the service", - streamManager.getCachedStreams().isEmpty()); - } - - @Test(timeout = 60000) - public void testShutdown() throws Exception { - service.shutdown(); - StreamManagerImpl streamManager = (StreamManagerImpl) service.getStreamManager(); - WriteResponse response = - Await.result(service.write(testName.getMethodName(), createRecord(0L))); - assertEquals("Write should fail with " + StatusCode.SERVICE_UNAVAILABLE, - StatusCode.SERVICE_UNAVAILABLE, response.getHeader().getCode()); - assertTrue("There should be no streams created after shutdown", - streamManager.getCachedStreams().isEmpty()); - assertTrue("There should be no streams acquired after shutdown", - streamManager.getAcquiredStreams().isEmpty()); - } - - @Test(timeout = 60000) - public void testGetOwner() throws Exception { - ((LocalRoutingService) service.getRoutingService()) - .addHost("stream-0", service.getServiceAddress().getSocketAddress()) - .setAllowRetrySameHost(false); - - service.startPlacementPolicy(); - - WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext())); - assertEquals(StatusCode.FOUND, response.getHeader().getCode()); - assertEquals(service.getServiceAddress().toString(), - response.getHeader().getLocation()); - - // service cache "stream-2" - StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2", false); - // create write ops to stream-2 to make service acquire the stream - WriteOp op = createWriteOp(service, "stream-2", 0L); - stream.submit(op); - stream.start(); - WriteResponse wr = Await.result(op.result()); - assertEquals("Op should succeed", - StatusCode.SUCCESS, wr.getHeader().getCode()); - assertEquals("Service should acquire stream", - StreamStatus.INITIALIZED, stream.getStatus()); - assertNotNull(stream.getManager()); - assertNotNull(stream.getWriter()); - assertNull(stream.getLastException()); - - // the stream is acquired - response = FutureUtils.result(service.getOwner("stream-2", new WriteContext())); - assertEquals(StatusCode.FOUND, response.getHeader().getCode()); - assertEquals(service.getServiceAddress().toString(), - response.getHeader().getLocation()); - } - -}