Simplify the state transition on stream * the stream is created on INITIALIZING * when the stream is started, it would start transition from INITIALIZING to INITIALIZED * it would serve stream operations when the stream is INITIALIZED * it would be turned to ERROR when encountered exceptions. * the stream would be closed when service operation timeout or encountered any exceptions. it would first be removed from acquired mapping * the stream would be removed from cached mapping depends on probation time.
RB_ID=848047 Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f19e7564 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f19e7564 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f19e7564 Branch: refs/heads/merge/DL-98 Commit: f19e7564ff4a1ec1b5d6f2683db190d739df99bb Parents: 0a18f56 Author: Leigh Stewart <lstew...@twitter.com> Authored: Mon Dec 12 16:49:26 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Mon Dec 12 16:49:26 2016 -0800 ---------------------------------------------------------------------- .../service/DistributedLogServiceImpl.java | 27 +- .../service/stream/StreamImpl.java | 550 +++++++------------ .../service/stream/StreamManager.java | 5 +- .../service/stream/StreamManagerImpl.java | 15 +- .../service/TestDistributedLogService.java | 20 +- 5 files changed, 222 insertions(+), 395 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java index 751e972..3a9b904 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java @@ -90,7 +90,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -378,7 +377,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI // if it is closed, we would not acquire stream again. return null; } - writer = streamManager.getOrCreateStream(stream); + writer = streamManager.getOrCreateStream(stream, true); } finally { closeLock.readLock().unlock(); } @@ -631,26 +630,6 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI logger.info("Released KeepAlive Latch. Main thread will shut the service down."); } - @VisibleForTesting - java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) { - closeLock.readLock().lock(); - try { - if (serverStatus != ServerStatus.WRITE_AND_ACCEPT) { - return null; - } else if (delayMs > 0) { - return scheduler.schedule(runnable, delayMs, TimeUnit.MILLISECONDS); - } else { - return scheduler.submit(runnable); - } - } catch (RejectedExecutionException ree) { - logger.error("Failed to schedule task {} in {} ms : ", - new Object[] { runnable, delayMs, ree }); - return null; - } finally { - closeLock.readLock().unlock(); - } - } - // Test methods. private DynamicDistributedLogConfiguration getDynConf(String streamName) { @@ -664,8 +643,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI } @VisibleForTesting - Stream newStream(String name) { - return streamFactory.create(name, getDynConf(name), streamManager); + Stream newStream(String name) throws IOException { + return streamManager.getOrCreateStream(name, false); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java index 1204d39..3d5b9e7 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java @@ -26,7 +26,6 @@ import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.DistributedLogManager; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.DLException; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import com.twitter.distributedlog.exceptions.OverCapacityException; import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; import com.twitter.distributedlog.exceptions.StreamNotReadyException; @@ -70,24 +69,23 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; public class StreamImpl implements Stream { static final Logger logger = LoggerFactory.getLogger(StreamImpl.class); + /** + * The status of the stream. + * + * The status change of the stream should just go in one direction. If a stream hits + * any error, the stream should be put in error state. If a stream is in error state, + * it should be removed and not reused anymore. + */ public static enum StreamStatus { UNINITIALIZED(-1), INITIALIZING(0), INITIALIZED(1), - // if a stream is in failed state, it could be retried immediately. - // a stream will be put in failed state when encountered any stream exception. - FAILED(-2), - // if a stream is in backoff state, it would backoff for a while. - // a stream will be put in backoff state when failed to acquire the ownership. - BACKOFF(-3), CLOSING(-4), CLOSED(-5), // if a stream is in error state, it should be abort during closing. @@ -112,26 +110,15 @@ public class StreamImpl implements Stream { private final Partition partition; private DistributedLogManager manager; - // A write has been attempted since the last stream acquire. - private volatile boolean writeSinceLastAcquire = false; private volatile AsyncLogWriter writer; private volatile StreamStatus status; private volatile String owner; private volatile Throwable lastException; - private volatile boolean running = true; - private volatile boolean suspended = false; private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>(); private final Promise<Void> closePromise = new Promise<Void>(); private final Object txnLock = new Object(); private final TimeSequencer sequencer = new TimeSequencer(); - // last acquire time - private final Stopwatch lastAcquireWatch = Stopwatch.createUnstarted(); - // last acquire failure time - private final Stopwatch lastAcquireFailureWatch = Stopwatch.createUnstarted(); - private final long nextAcquireWaitTimeMs; - private ScheduledFuture<?> tryAcquireScheduledFuture = null; - private long scheduledAcquireDelayMs = 0L; private final StreamRequestLimiter limiter; private final DynamicDistributedLogConfiguration dynConf; private final DistributedLogConfiguration dlConfig; @@ -165,7 +152,7 @@ public class StreamImpl implements Stream { new ConcurrentHashMap<String, Counter>(); // Since we may create and discard streams at initialization if there's a race, - // must not do any expensive intialization here (particularly any locking or + // must not do any expensive initialization here (particularly any locking or // significant resource allocation etc.). StreamImpl(final String name, final Partition partition, @@ -189,7 +176,6 @@ public class StreamImpl implements Stream { this.partition = partition; this.status = StreamStatus.UNINITIALIZED; this.lastException = new IOException("Fail to write record to stream " + name); - this.nextAcquireWaitTimeMs = dlConfig.getZKSessionTimeoutMilliseconds() * 3 / 5; this.streamConfigProvider = streamConfigProvider; this.dlNamespace = dlNamespace; this.featureRateLimitDisabled = featureProvider.getFeature( @@ -275,54 +261,16 @@ public class StreamImpl implements Stream { return String.format("Stream:%s, %s, %s Status:%s", name, manager, writer, status); } - // schedule stream acquistion - private void tryAcquireStreamOnce() { - if (!running) { - return; - } - - boolean needAcquire = false; - boolean checkNextTime = false; - synchronized (this) { - switch (this.status) { - case INITIALIZING: - streamManager.notifyReleased(this); - needAcquire = true; - break; - case FAILED: - this.status = StreamStatus.INITIALIZING; - streamManager.notifyReleased(this); - needAcquire = true; - break; - case BACKOFF: - // We may end up here after timeout on streamLock. To avoid acquire on every timeout - // we should only try again if a write has been attempted since the last acquire - // attempt. If we end up here because the request handler woke us up, the flag will - // be set and we will try to acquire as intended. - if (writeSinceLastAcquire) { - this.status = StreamStatus.INITIALIZING; - streamManager.notifyReleased(this); - needAcquire = true; - } else { - checkNextTime = true; - } - break; - default: - break; - } - } - if (needAcquire) { - lastAcquireWatch.reset().start(); - acquireStream().addEventListener(new FutureEventListener<Boolean>() { + @Override + public void start() { + // acquire the stream + acquireStream().addEventListener(new FutureEventListener<Boolean>() { @Override public void onSuccess(Boolean success) { - synchronized (StreamImpl.this) { - scheduledAcquireDelayMs = 0L; - tryAcquireScheduledFuture = null; - } if (!success) { - // schedule acquire in nextAcquireWaitTimeMs - scheduleTryAcquireOnce(nextAcquireWaitTimeMs); + // failed to acquire the stream. set the stream in error status and close it. + setStreamInErrorStatus(); + requestClose("Failed to acquire the ownership"); } } @@ -330,65 +278,40 @@ public class StreamImpl implements Stream { public void onFailure(Throwable cause) { // unhandled exceptions logger.error("Stream {} threw unhandled exception : ", name, cause); + // failed to acquire the stream. set the stream in error status and close it. setStreamInErrorStatus(); requestClose("Unhandled exception"); } }); - } else if (StreamStatus.isUnavailable(status)) { - // if the stream is unavailable, stop the thread and close the stream - requestClose("Stream is unavailable anymore"); - } else if (StreamStatus.INITIALIZED != status && lastAcquireWatch.elapsed(TimeUnit.HOURS) > 2) { - // if the stream isn't in initialized state and no writes coming in, then close the stream - requestClose("Stream not used anymore"); - } else if (checkNextTime) { - synchronized (StreamImpl.this) { - scheduledAcquireDelayMs = 0L; - tryAcquireScheduledFuture = null; - } - // schedule acquire in nextAcquireWaitTimeMs - scheduleTryAcquireOnce(nextAcquireWaitTimeMs); - } } - private synchronized void scheduleTryAcquireOnce(long delayMs) { - if (null != tryAcquireScheduledFuture) { - if (delayMs <= 0) { - if (scheduledAcquireDelayMs <= 0L || - (scheduledAcquireDelayMs > 0L - && !tryAcquireScheduledFuture.cancel(false))) { - return; - } - // if the scheduled one could be cancelled, re-submit one - } else { - return; + // + // Stats Operations + // + + void countException(Throwable t, StatsLogger streamExceptionLogger) { + String exceptionName = null == t ? "null" : t.getClass().getName(); + Counter counter = exceptionCounters.get(exceptionName); + if (null == counter) { + counter = exceptionStatLogger.getCounter(exceptionName); + Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter); + if (null != oldCounter) { + counter = oldCounter; } } - tryAcquireScheduledFuture = schedule(new Runnable() { - @Override - public void run() { - tryAcquireStreamOnce(); - } - }, delayMs); - scheduledAcquireDelayMs = delayMs; + counter.inc(); + streamExceptionLogger.getCounter(exceptionName).inc(); } - @Override - public void start() { - scheduleTryAcquireOnce(0); + boolean isCriticalException(Throwable cause) { + return !(cause instanceof OwnershipAcquireFailedException); } - ScheduledFuture<?> schedule(Runnable runnable, long delayMs) { - if (!running) { - return null; - } - try { - return scheduler.schedule(name, runnable, delayMs, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ree) { - logger.error("Failed to schedule task {} in {} ms : ", - new Object[] { runnable, delayMs, ree }); - return null; - } - } + // + // Service Timeout: + // - schedule a timeout function to handle operation timeouts: {@link #handleServiceTimeout(String)} + // - if the operation is completed within timeout period, cancel the timeout. + // void scheduleTimeout(final StreamOp op) { final Timeout timeout = requestTimer.newTimeout(new TimerTask() { @@ -418,12 +341,14 @@ public class StreamImpl implements Stream { * stream off the proxy for a period of time, hopefully long enough for the * issues to be resolved, or for whoop to kick in and kill the shard. */ - synchronized void handleServiceTimeout(String reason) { - if (StreamStatus.isUnavailable(status)) { - return; + void handleServiceTimeout(String reason) { + synchronized (this) { + if (StreamStatus.isUnavailable(status)) { + return; + } + // Mark stream in error state + setStreamInErrorStatus(); } - // Mark stream in error state - setStreamInErrorStatus(); // Async close request, and schedule eviction when its done. Future<Void> closeFuture = requestClose(reason, false /* dont remove */); @@ -436,6 +361,10 @@ public class StreamImpl implements Stream { }); } + // + // Submit the operation to the stream. + // + /** * Execute the StreamOp. If reacquire is needed, this may initiate reacquire and queue the op for * execution once complete. @@ -445,9 +374,6 @@ public class StreamImpl implements Stream { */ @Override public void submit(StreamOp op) { - // Let stream acquire thread know a write has been attempted. - writeSinceLastAcquire = true; - try { limiter.apply(op); } catch (OverCapacityException ex) { @@ -460,36 +386,28 @@ public class StreamImpl implements Stream { scheduleTimeout(op); } - boolean notifyAcquireThread = false; boolean completeOpNow = false; boolean success = true; if (StreamStatus.isUnavailable(status)) { // Stream is closed, fail the op immediately op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); return; - } if (StreamStatus.INITIALIZED == status && writer != null) { + } else if (StreamStatus.INITIALIZED == status && writer != null) { completeOpNow = true; success = true; } else { synchronized (this) { if (StreamStatus.isUnavailable(status)) { - // complete the write op as {@link #executeOp(op, success)} will handle closed case. - completeOpNow = true; - success = true; + // Stream is closed, fail the op immediately + op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); + return; } if (StreamStatus.INITIALIZED == status) { completeOpNow = true; success = true; - } else if (StreamStatus.BACKOFF == status && - lastAcquireFailureWatch.elapsed(TimeUnit.MILLISECONDS) < nextAcquireWaitTimeMs) { - completeOpNow = true; - success = false; } else if (failFastOnStreamNotReady) { - notifyAcquireThread = true; - completeOpNow = false; - success = false; op.fail(new StreamNotReadyException("Stream " + name + " is not ready; status = " + status)); - } else { // closing & initializing - notifyAcquireThread = true; + return; + } else { // the stream is still initializing pendingOps.add(op); pendingOpsCounter.inc(); if (1 == pendingOps.size()) { @@ -500,14 +418,15 @@ public class StreamImpl implements Stream { } } } - if (notifyAcquireThread && !suspended) { - scheduleTryAcquireOnce(0L); - } if (completeOpNow) { executeOp(op, success); } } + // + // Execute operations and handle exceptions on operations + // + /** * Execute the <i>op</i> immediately. * @@ -516,20 +435,7 @@ public class StreamImpl implements Stream { * @param success * whether the operation is success or not. */ - void executeOp(StreamOp op, boolean success) { - closeLock.readLock().lock(); - try { - if (StreamStatus.isUnavailable(status)) { - op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); - return; - } - doExecuteOp(op, success); - } finally { - closeLock.readLock().unlock(); - } - } - - private void doExecuteOp(final StreamOp op, boolean success) { + void executeOp(final StreamOp op, boolean success) { final AsyncLogWriter writer; final Throwable lastException; synchronized (this) { @@ -552,7 +458,7 @@ public class StreamImpl implements Stream { case FOUND: assert(cause instanceof OwnershipAcquireFailedException); countAsException = false; - handleOwnershipAcquireFailedException(op, (OwnershipAcquireFailedException) cause); + handleExceptionOnStreamOp(op, cause); break; case ALREADY_CLOSED: assert(cause instanceof AlreadyClosedException); @@ -573,13 +479,14 @@ public class StreamImpl implements Stream { case OVER_CAPACITY: op.fail(cause); break; - // exceptions that *could* / *might* be recovered by creating a new writer + // the DL writer hits exception, simple set the stream to error status + // and fail the request default: - handleRecoverableDLException(op, cause); + handleExceptionOnStreamOp(op, cause); break; } } else { - handleUnknownException(op, cause); + handleExceptionOnStreamOp(op, cause); } if (countAsException) { countException(cause, streamExceptionStatLogger); @@ -587,88 +494,41 @@ public class StreamImpl implements Stream { } }); } else { - op.fail(lastException); - } - } - - /** - * Handle recoverable dl exception. - * - * @param op - * stream operation executing - * @param cause - * exception received when executing <i>op</i> - */ - private void handleRecoverableDLException(StreamOp op, final Throwable cause) { - AsyncLogWriter oldWriter = null; - boolean statusChanged = false; - synchronized (this) { - if (StreamStatus.INITIALIZED == status) { - oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED, - null, null, cause); - statusChanged = true; + if (null != lastException) { + op.fail(lastException); + } else { + op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); } } - if (statusChanged) { - Abortables.asyncAbort(oldWriter, false); - logger.error("Failed to write data into stream {} : ", name, cause); - scheduleTryAcquireOnce(0L); - } - op.fail(cause); } /** - * Handle unknown exception when executing <i>op</i>. + * Handle exception when executing <i>op</i>. * * @param op * stream operation executing * @param cause * exception received when executing <i>op</i> */ - private void handleUnknownException(StreamOp op, final Throwable cause) { + private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) { AsyncLogWriter oldWriter = null; boolean statusChanged = false; synchronized (this) { if (StreamStatus.INITIALIZED == status) { - oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED, - null, null, cause); + oldWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, cause); statusChanged = true; } } if (statusChanged) { Abortables.asyncAbort(oldWriter, false); - logger.error("Failed to write data into stream {} : ", name, cause); - scheduleTryAcquireOnce(0L); - } - op.fail(cause); - } - - /** - * Handle losing ownership during executing <i>op</i>. - * - * @param op - * stream operation executing - * @param oafe - * the ownership exception received when executing <i>op</i> - */ - private void handleOwnershipAcquireFailedException(StreamOp op, final OwnershipAcquireFailedException oafe) { - logger.warn("Failed to write data into stream {} because stream is acquired by {} : {}", - new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()}); - AsyncLogWriter oldWriter = null; - boolean statusChanged = false; - synchronized (this) { - if (StreamStatus.INITIALIZED == status) { - oldWriter = - setStreamStatus(StreamStatus.BACKOFF, StreamStatus.INITIALIZED, - null, oafe.getCurrentOwner(), oafe); - statusChanged = true; + if (isCriticalException(cause)) { + logger.error("Failed to write data into stream {} : ", name, cause); + } else { + logger.warn("Failed to write data into stream {} : {}", name, cause.getMessage()); } + requestClose("Failed to write data into stream " + name + " : " + cause.getMessage()); } - if (statusChanged) { - Abortables.asyncAbort(oldWriter, false); - scheduleTryAcquireOnce(nextAcquireWaitTimeMs); - } - op.fail(oafe); + op.fail(cause); } /** @@ -680,129 +540,126 @@ public class StreamImpl implements Stream { fatalErrorHandler.notifyFatalError(); } - void countException(Throwable t, StatsLogger streamExceptionLogger) { - String exceptionName = null == t ? "null" : t.getClass().getName(); - Counter counter = exceptionCounters.get(exceptionName); - if (null == counter) { - counter = exceptionStatLogger.getCounter(exceptionName); - Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter); - if (null != oldCounter) { - counter = oldCounter; - } - } - counter.inc(); - streamExceptionLogger.getCounter(exceptionName).inc(); - } + // + // Acquire streams + // Future<Boolean> acquireStream() { - // Reset this flag so the acquire thread knows whether re-acquire is needed. - writeSinceLastAcquire = false; - final Stopwatch stopwatch = Stopwatch.createStarted(); final Promise<Boolean> acquirePromise = new Promise<Boolean>(); manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() { @Override public void onSuccess(AsyncLogWriter w) { - synchronized (txnLock) { - sequencer.setLastId(w.getLastTxId()); - } - AsyncLogWriter oldWriter; - Queue<StreamOp> oldPendingOps; - boolean success; - synchronized (StreamImpl.this) { - oldWriter = setStreamStatus(StreamStatus.INITIALIZED, - StreamStatus.INITIALIZING, w, null, null); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - success = true; - } - // check if the stream is allowed to be acquired - if (!streamManager.allowAcquire(StreamImpl.this)) { - if (null != oldWriter) { - Abortables.asyncAbort(oldWriter, true); - } - int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy(); - StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream() - + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions"); - countException(sue, exceptionStatLogger); - logger.error("Failed to acquire stream {} because it is unavailable : {}", - name, sue.getMessage()); - synchronized (this) { - oldWriter = setStreamStatus(StreamStatus.ERROR, - StreamStatus.INITIALIZED, null, null, sue); - // we don't switch the pending ops since they are already switched - // when setting the status to initialized - success = false; - } - } - processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps); + onAcquireStreamSuccess(w, stopwatch, acquirePromise); } @Override public void onFailure(Throwable cause) { - AsyncLogWriter oldWriter; - Queue<StreamOp> oldPendingOps; - boolean success; - if (cause instanceof AlreadyClosedException) { - countException(cause, streamExceptionStatLogger); - handleAlreadyClosedException((AlreadyClosedException) cause); - return; - } else if (cause instanceof OwnershipAcquireFailedException) { - OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause; - logger.warn("Failed to acquire stream ownership for {}, current owner is {} : {}", - new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()}); - synchronized (StreamImpl.this) { - oldWriter = setStreamStatus(StreamStatus.BACKOFF, - StreamStatus.INITIALIZING, null, oafe.getCurrentOwner(), oafe); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - success = false; - } - } else if (cause instanceof InvalidStreamNameException) { - InvalidStreamNameException isne = (InvalidStreamNameException) cause; - countException(isne, streamExceptionStatLogger); - logger.error("Failed to acquire stream {} due to its name is invalid", name); - synchronized (StreamImpl.this) { - oldWriter = setStreamStatus(StreamStatus.ERROR, - StreamStatus.INITIALIZING, null, null, isne); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - success = false; - } - } else { - countException(cause, streamExceptionStatLogger); - logger.error("Failed to initialize stream {} : ", name, cause); - synchronized (StreamImpl.this) { - oldWriter = setStreamStatus(StreamStatus.FAILED, - StreamStatus.INITIALIZING, null, null, cause); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - success = false; - } - } - processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps); + onAcquireStreamFailure(cause, stopwatch, acquirePromise); } - void processPendingRequestsAfterOpen(boolean success, - AsyncLogWriter oldWriter, - Queue<StreamOp> oldPendingOps) { - if (success) { - streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } else { - streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - for (StreamOp op : oldPendingOps) { - executeOp(op, success); - pendingOpsCounter.dec(); - } - Abortables.asyncAbort(oldWriter, true); - FutureUtils.setValue(acquirePromise, success); - } }, scheduler, getStreamName())); return acquirePromise; } + private void onAcquireStreamSuccess(AsyncLogWriter w, + Stopwatch stopwatch, + Promise<Boolean> acquirePromise) { + synchronized (txnLock) { + sequencer.setLastId(w.getLastTxId()); + } + AsyncLogWriter oldWriter; + Queue<StreamOp> oldPendingOps; + boolean success; + synchronized (StreamImpl.this) { + oldWriter = setStreamStatus(StreamStatus.INITIALIZED, + StreamStatus.INITIALIZING, w, null); + oldPendingOps = pendingOps; + pendingOps = new ArrayDeque<StreamOp>(); + success = true; + } + // check if the stream is allowed to be acquired + if (!streamManager.allowAcquire(StreamImpl.this)) { + if (null != oldWriter) { + Abortables.asyncAbort(oldWriter, true); + } + int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy(); + StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream() + + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions"); + countException(sue, exceptionStatLogger); + logger.error("Failed to acquire stream {} because it is unavailable : {}", + name, sue.getMessage()); + synchronized (this) { + oldWriter = setStreamStatus(StreamStatus.ERROR, + StreamStatus.INITIALIZED, null, sue); + // we don't switch the pending ops since they are already switched + // when setting the status to initialized + success = false; + } + } + processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise); + } + + private void onAcquireStreamFailure(Throwable cause, + Stopwatch stopwatch, + Promise<Boolean> acquirePromise) { + AsyncLogWriter oldWriter; + Queue<StreamOp> oldPendingOps; + boolean success; + if (cause instanceof AlreadyClosedException) { + countException(cause, streamExceptionStatLogger); + handleAlreadyClosedException((AlreadyClosedException) cause); + return; + } else { + if (isCriticalException(cause)) { + countException(cause, streamExceptionStatLogger); + logger.error("Failed to acquire stream {} : ", name, cause); + } else { + logger.warn("Failed to acquire stream {} : {}", name, cause.getMessage()); + } + synchronized (StreamImpl.this) { + oldWriter = setStreamStatus(StreamStatus.ERROR, + StreamStatus.INITIALIZING, null, cause); + oldPendingOps = pendingOps; + pendingOps = new ArrayDeque<StreamOp>(); + success = false; + } + } + processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise); + } + + /** + * Process the pending request after acquired stream. + * + * @param success whether the acquisition succeed or not + * @param oldWriter the old writer to abort + * @param oldPendingOps the old pending ops to execute + * @param stopwatch stopwatch to measure the time spent on acquisition + * @param acquirePromise the promise to complete the acquire operation + */ + void processPendingRequestsAfterAcquire(boolean success, + AsyncLogWriter oldWriter, + Queue<StreamOp> oldPendingOps, + Stopwatch stopwatch, + Promise<Boolean> acquirePromise) { + if (success) { + streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } else { + streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } + for (StreamOp op : oldPendingOps) { + executeOp(op, success); + pendingOpsCounter.dec(); + } + Abortables.asyncAbort(oldWriter, true); + FutureUtils.setValue(acquirePromise, success); + } + + // + // Stream Status Changes + // + synchronized void setStreamInErrorStatus() { if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) { return; @@ -819,8 +676,6 @@ public class StreamImpl implements Stream { * old status * @param writer * new log writer - * @param owner - * new owner * @param t * new exception * @return old writer if it exists @@ -828,7 +683,6 @@ public class StreamImpl implements Stream { synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus, StreamStatus oldStatus, AsyncLogWriter writer, - String owner, Throwable t) { if (oldStatus != this.status) { logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}", @@ -836,6 +690,11 @@ public class StreamImpl implements Stream { return null; } + String owner = null; + if (t instanceof OwnershipAcquireFailedException) { + owner = ((OwnershipAcquireFailedException) t).getCurrentOwner(); + } + AsyncLogWriter oldWriter = this.writer; this.writer = writer; if (null != owner && owner.equals(clientId)) { @@ -852,10 +711,6 @@ public class StreamImpl implements Stream { } this.lastException = t; this.status = newStatus; - if (StreamStatus.BACKOFF == newStatus && null != owner) { - // start failure watch - this.lastAcquireFailureWatch.reset().start(); - } if (StreamStatus.INITIALIZED == newStatus) { streamManager.notifyAcquired(this); logger.info("Inserted acquired stream {} -> writer {}", name, this); @@ -866,12 +721,16 @@ public class StreamImpl implements Stream { return oldWriter; } + // + // Stream Close Functions + // + void close(DistributedLogManager dlm) { if (null != dlm) { try { dlm.close(); } catch (IOException ioe) { - logger.warn("Failed to close dlm for {} : ", ioe); + logger.warn("Failed to close dlm for {} : ", name, ioe); } } } @@ -902,12 +761,16 @@ public class StreamImpl implements Stream { // them. close(abort); if (uncache) { + final long probationTimeoutMs; + if (null != owner) { + probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3; + } else { + probationTimeoutMs = 0L; + } closePromise.onSuccess(new AbstractFunction1<Void, BoxedUnit>() { @Override public BoxedUnit apply(Void result) { - if (streamManager.notifyRemoved(StreamImpl.this)) { - logger.info("Removed cached stream {} after closed.", name); - } + streamManager.scheduleRemoval(StreamImpl.this, probationTimeoutMs); return BoxedUnit.UNIT; } }); @@ -949,14 +812,6 @@ public class StreamImpl implements Stream { closeLock.writeLock().unlock(); } logger.info("Closing stream {} ...", name); - running = false; - // stop any outstanding ownership acquire actions first - synchronized (this) { - if (null != tryAcquireScheduledFuture) { - tryAcquireScheduledFuture.cancel(true); - } - } - logger.info("Stopped threads of stream {}.", name); // Close the writers to release the locks before failing the requests Future<Void> closeWriterFuture; if (abort) { @@ -1016,19 +871,6 @@ public class StreamImpl implements Stream { // Test-only apis @VisibleForTesting - public StreamImpl suspendAcquiring() { - suspended = true; - return this; - } - - @VisibleForTesting - public StreamImpl resumeAcquiring() { - suspended = false; - scheduleTryAcquireOnce(0L); - return this; - } - - @VisibleForTesting public int numPendingOps() { Queue<StreamOp> queue = pendingOps; return null == queue ? 0 : queue.size(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java index 972eb55..e171e46 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java @@ -43,10 +43,11 @@ public interface StreamManager { /** * Get a cached stream and create a new one if it doesnt exist. - * @param stream name + * @param streamName stream name + * @param start whether to start the stream after it is created. * @return future satisfied once close complete */ - Stream getOrCreateStream(String stream) throws IOException; + Stream getOrCreateStream(String streamName, boolean start) throws IOException; /** * Asynchronously create a new stream. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java index aa08a24..df336fe 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java @@ -33,15 +33,14 @@ import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; import com.twitter.distributedlog.util.ConfUtils; import com.twitter.util.Future; import com.twitter.util.Promise; -import java.io.IOException; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; - import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -233,7 +232,7 @@ public class StreamManagerImpl implements StreamManager { } @Override - public Stream getOrCreateStream(String streamName) throws IOException { + public Stream getOrCreateStream(String streamName, boolean start) throws IOException { Stream stream = streams.get(streamName); if (null == stream) { closeLock.readLock().lock(); @@ -261,7 +260,9 @@ public class StreamManagerImpl implements StreamManager { numCached.getAndIncrement(); logger.info("Inserted mapping stream name {} -> stream {}", streamName, stream); stream.initialize(); - stream.start(); + if (start) { + stream.start(); + } } } finally { closeLock.readLock().unlock(); @@ -283,8 +284,10 @@ public class StreamManagerImpl implements StreamManager { @Override public void scheduleRemoval(final Stream stream, long delayMs) { - logger.info("Scheduling removal of stream {} from cache after {} sec.", - stream.getStreamName(), delayMs); + if (delayMs > 0) { + logger.info("Scheduling removal of stream {} from cache after {} sec.", + stream.getStreamName(), delayMs); + } schedule(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java index 17fae4a..4195ed3 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java @@ -89,7 +89,8 @@ public class TestDistributedLogService extends TestDistributedLogBase { dlConf.addConfiguration(conf); dlConf.setLockTimeout(0) .setOutputBufferSize(0) - .setPeriodicFlushFrequencyMilliSeconds(10); + .setPeriodicFlushFrequencyMilliSeconds(10) + .setSchedulerShutdownTimeoutMs(100); serverConf = newLocalServerConf(); uri = createDLMURI("/" + testName.getMethodName()); ensureURICreated(uri); @@ -171,10 +172,11 @@ public class TestDistributedLogService extends TestDistributedLogBase { public void testAcquireStreams() throws Exception { String streamName = testName.getMethodName(); StreamImpl s0 = createUnstartedStream(service, streamName); - s0.suspendAcquiring(); - DistributedLogServiceImpl service1 = createService(serverConf, dlConf); + ServerConfiguration serverConf1 = new ServerConfiguration(); + serverConf1.addConfiguration(serverConf); + serverConf1.setServerPort(9999); + DistributedLogServiceImpl service1 = createService(serverConf1, dlConf); StreamImpl s1 = createUnstartedStream(service1, streamName); - s1.suspendAcquiring(); // create write ops WriteOp op0 = createWriteOp(service, streamName, 0L); @@ -190,7 +192,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { 1, s1.numPendingOps()); // start acquiring s0 - s0.resumeAcquiring().start(); + s0.start(); WriteResponse wr0 = Await.result(op0.result()); assertEquals("Op 0 should succeed", StatusCode.SUCCESS, wr0.getHeader().getCode()); @@ -201,12 +203,12 @@ public class TestDistributedLogService extends TestDistributedLogBase { assertNull(s0.getLastException()); // start acquiring s1 - s1.resumeAcquiring().start(); + s1.start(); WriteResponse wr1 = Await.result(op1.result()); assertEquals("Op 1 should fail", StatusCode.FOUND, wr1.getHeader().getCode()); - assertEquals("Service 1 should be in BACKOFF state", - StreamStatus.BACKOFF, s1.getStatus()); + assertEquals("Service 1 should be in ERROR state", + StreamStatus.ERROR, s1.getStatus()); assertNotNull(s1.getManager()); assertNull(s1.getWriter()); assertNotNull(s1.getLastException()); @@ -727,7 +729,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { for (Stream s : streamManager.getAcquiredStreams().values()) { StreamImpl stream = (StreamImpl) s; - stream.setStatus(StreamStatus.FAILED); + stream.setStatus(StreamStatus.ERROR); } Future<List<Void>> closeResult = localService.closeStreams();