Repository: incubator-distributedlog Updated Branches: refs/heads/master 5b55bee23 -> 74a33029c
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java index aa08a24..df336fe 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java @@ -33,15 +33,14 @@ import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; import com.twitter.distributedlog.util.ConfUtils; import com.twitter.util.Future; import com.twitter.util.Promise; -import java.io.IOException; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; - import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -233,7 +232,7 @@ public class StreamManagerImpl implements StreamManager { } @Override - public Stream getOrCreateStream(String streamName) throws IOException { + public Stream getOrCreateStream(String streamName, boolean start) throws IOException { Stream stream = streams.get(streamName); if (null == stream) { closeLock.readLock().lock(); @@ -261,7 +260,9 @@ public class StreamManagerImpl implements StreamManager { numCached.getAndIncrement(); logger.info("Inserted mapping stream name {} -> stream {}", streamName, stream); stream.initialize(); - stream.start(); + if (start) { + stream.start(); + } } } finally { closeLock.readLock().unlock(); @@ -283,8 +284,10 @@ public class StreamManagerImpl implements StreamManager { @Override public void scheduleRemoval(final Stream stream, long delayMs) { - logger.info("Scheduling removal of stream {} from cache after {} sec.", - stream.getStreamName(), delayMs); + if (delayMs > 0) { + logger.info("Scheduling removal of stream {} from cache after {} sec.", + stream.getStreamName(), delayMs); + } schedule(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java index 17fae4a..4195ed3 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java @@ -89,7 +89,8 @@ public class TestDistributedLogService extends TestDistributedLogBase { dlConf.addConfiguration(conf); dlConf.setLockTimeout(0) .setOutputBufferSize(0) - .setPeriodicFlushFrequencyMilliSeconds(10); + .setPeriodicFlushFrequencyMilliSeconds(10) + .setSchedulerShutdownTimeoutMs(100); serverConf = newLocalServerConf(); uri = createDLMURI("/" + testName.getMethodName()); ensureURICreated(uri); @@ -171,10 +172,11 @@ public class TestDistributedLogService extends TestDistributedLogBase { public void testAcquireStreams() throws Exception { String streamName = testName.getMethodName(); StreamImpl s0 = createUnstartedStream(service, streamName); - s0.suspendAcquiring(); - DistributedLogServiceImpl service1 = createService(serverConf, dlConf); + ServerConfiguration serverConf1 = new ServerConfiguration(); + serverConf1.addConfiguration(serverConf); + serverConf1.setServerPort(9999); + DistributedLogServiceImpl service1 = createService(serverConf1, dlConf); StreamImpl s1 = createUnstartedStream(service1, streamName); - s1.suspendAcquiring(); // create write ops WriteOp op0 = createWriteOp(service, streamName, 0L); @@ -190,7 +192,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { 1, s1.numPendingOps()); // start acquiring s0 - s0.resumeAcquiring().start(); + s0.start(); WriteResponse wr0 = Await.result(op0.result()); assertEquals("Op 0 should succeed", StatusCode.SUCCESS, wr0.getHeader().getCode()); @@ -201,12 +203,12 @@ public class TestDistributedLogService extends TestDistributedLogBase { assertNull(s0.getLastException()); // start acquiring s1 - s1.resumeAcquiring().start(); + s1.start(); WriteResponse wr1 = Await.result(op1.result()); assertEquals("Op 1 should fail", StatusCode.FOUND, wr1.getHeader().getCode()); - assertEquals("Service 1 should be in BACKOFF state", - StreamStatus.BACKOFF, s1.getStatus()); + assertEquals("Service 1 should be in ERROR state", + StreamStatus.ERROR, s1.getStatus()); assertNotNull(s1.getManager()); assertNull(s1.getWriter()); assertNotNull(s1.getLastException()); @@ -727,7 +729,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { for (Stream s : streamManager.getAcquiredStreams().values()) { StreamImpl stream = (StreamImpl) s; - stream.setStatus(StreamStatus.FAILED); + stream.setStatus(StreamStatus.ERROR); } Future<List<Void>> closeResult = localService.closeStreams(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a7f60b1..52414da 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <zookeeper.version>3.5.1-alpha</zookeeper.version> - <bookkeeper.version>4.3.5-TWTTR-OSS</bookkeeper.version> + <bookkeeper.version>4.3.6-TWTTR-OSS</bookkeeper.version> <birdcage.sha>6.34.0</birdcage.sha> <scrooge.version>4.6.0</scrooge.version> <scrooge-maven-plugin.version>3.17.0</scrooge-maven-plugin.version>