http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java deleted file mode 100644 index bf7a1ad..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java +++ /dev/null @@ -1,925 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; -import org.apache.distributedlog.AsyncLogWriter; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogManager; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.exceptions.AlreadyClosedException; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.OverCapacityException; -import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; -import org.apache.distributedlog.exceptions.StreamNotReadyException; -import org.apache.distributedlog.exceptions.StreamUnavailableException; -import org.apache.distributedlog.exceptions.UnexpectedException; -import org.apache.distributedlog.io.Abortables; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.service.FatalErrorHandler; -import org.apache.distributedlog.service.ServerFeatureKeys; -import org.apache.distributedlog.service.config.ServerConfiguration; -import org.apache.distributedlog.service.config.StreamConfigProvider; -import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter; -import org.apache.distributedlog.service.streamset.Partition; -import org.apache.distributedlog.stats.BroadCastStatsLogger; -import org.apache.distributedlog.util.FutureUtils; -import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.TimeSequencer; -import org.apache.distributedlog.util.Utils; -import com.twitter.util.Duration; -import com.twitter.util.Function0; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.TimeoutException; -import com.twitter.util.Timer; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -/** - * Implementation of {@link Stream}. - */ -public class StreamImpl implements Stream { - - private static final Logger logger = LoggerFactory.getLogger(StreamImpl.class); - - /** - * The status of the stream. - * - * <p>The status change of the stream should just go in one direction. If a stream hits - * any error, the stream should be put in error state. If a stream is in error state, - * it should be removed and not reused anymore. - */ - public enum StreamStatus { - UNINITIALIZED(-1), - INITIALIZING(0), - INITIALIZED(1), - CLOSING(-4), - CLOSED(-5), - // if a stream is in error state, it should be abort during closing. - ERROR(-6); - - final int code; - - StreamStatus(int code) { - this.code = code; - } - - int getCode() { - return code; - } - - public static boolean isUnavailable(StreamStatus status) { - return StreamStatus.ERROR == status || StreamStatus.CLOSING == status || StreamStatus.CLOSED == status; - } - } - - private final String name; - private final Partition partition; - private DistributedLogManager manager; - - private volatile AsyncLogWriter writer; - private volatile StreamStatus status; - private volatile String owner; - private volatile Throwable lastException; - private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>(); - - private final Promise<Void> closePromise = new Promise<Void>(); - private final Object txnLock = new Object(); - private final TimeSequencer sequencer = new TimeSequencer(); - private final StreamRequestLimiter limiter; - private final DynamicDistributedLogConfiguration dynConf; - private final DistributedLogConfiguration dlConfig; - private final DistributedLogNamespace dlNamespace; - private final String clientId; - private final OrderedScheduler scheduler; - private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock(); - private final Feature featureRateLimitDisabled; - private final StreamManager streamManager; - private final StreamConfigProvider streamConfigProvider; - private final FatalErrorHandler fatalErrorHandler; - private final long streamProbationTimeoutMs; - private final long serviceTimeoutMs; - private final long writerCloseTimeoutMs; - private final boolean failFastOnStreamNotReady; - private final HashedWheelTimer requestTimer; - private final Timer futureTimer; - - // Stats - private final StatsLogger streamLogger; - private final StatsLogger streamExceptionStatLogger; - private final StatsLogger limiterStatLogger; - private final Counter serviceTimeout; - private final OpStatsLogger streamAcquireStat; - private final OpStatsLogger writerCloseStatLogger; - private final Counter pendingOpsCounter; - private final Counter unexpectedExceptions; - private final Counter writerCloseTimeoutCounter; - private final StatsLogger exceptionStatLogger; - private final ConcurrentHashMap<String, Counter> exceptionCounters = - new ConcurrentHashMap<String, Counter>(); - private final Gauge<Number> streamStatusGauge; - - // Since we may create and discard streams at initialization if there's a race, - // must not do any expensive initialization here (particularly any locking or - // significant resource allocation etc.). - StreamImpl(final String name, - final Partition partition, - String clientId, - StreamManager streamManager, - StreamOpStats streamOpStats, - ServerConfiguration serverConfig, - DistributedLogConfiguration dlConfig, - DynamicDistributedLogConfiguration streamConf, - FeatureProvider featureProvider, - StreamConfigProvider streamConfigProvider, - DistributedLogNamespace dlNamespace, - OrderedScheduler scheduler, - FatalErrorHandler fatalErrorHandler, - HashedWheelTimer requestTimer, - Timer futureTimer) { - this.clientId = clientId; - this.dlConfig = dlConfig; - this.streamManager = streamManager; - this.name = name; - this.partition = partition; - this.status = StreamStatus.UNINITIALIZED; - this.lastException = new IOException("Fail to write record to stream " + name); - this.streamConfigProvider = streamConfigProvider; - this.dlNamespace = dlNamespace; - this.featureRateLimitDisabled = featureProvider.getFeature( - ServerFeatureKeys.SERVICE_RATE_LIMIT_DISABLED.name().toLowerCase()); - this.scheduler = scheduler; - this.serviceTimeoutMs = serverConfig.getServiceTimeoutMs(); - this.streamProbationTimeoutMs = serverConfig.getStreamProbationTimeoutMs(); - this.writerCloseTimeoutMs = serverConfig.getWriterCloseTimeoutMs(); - this.failFastOnStreamNotReady = dlConfig.getFailFastOnStreamNotReady(); - this.fatalErrorHandler = fatalErrorHandler; - this.dynConf = streamConf; - StatsLogger limiterStatsLogger = BroadCastStatsLogger.two( - streamOpStats.baseScope("stream_limiter"), - streamOpStats.streamRequestScope(partition, "limiter")); - this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled); - this.requestTimer = requestTimer; - this.futureTimer = futureTimer; - - // Stats - this.streamLogger = streamOpStats.streamRequestStatsLogger(partition); - this.limiterStatLogger = streamOpStats.baseScope("request_limiter"); - this.streamExceptionStatLogger = streamLogger.scope("exceptions"); - this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout"); - StatsLogger streamsStatsLogger = streamOpStats.baseScope("streams"); - this.streamAcquireStat = streamsStatsLogger.getOpStatsLogger("acquire"); - this.pendingOpsCounter = streamOpStats.baseCounter("pending_ops"); - this.unexpectedExceptions = streamOpStats.baseCounter("unexpected_exceptions"); - this.exceptionStatLogger = streamOpStats.requestScope("exceptions"); - this.writerCloseStatLogger = streamsStatsLogger.getOpStatsLogger("writer_close"); - this.writerCloseTimeoutCounter = streamsStatsLogger.getCounter("writer_close_timeouts"); - // Gauges - this.streamStatusGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return StreamStatus.UNINITIALIZED.getCode(); - } - @Override - public Number getSample() { - return status.getCode(); - } - }; - } - - @Override - public String getOwner() { - return owner; - } - - @Override - public String getStreamName() { - return name; - } - - @Override - public DynamicDistributedLogConfiguration getStreamConfiguration() { - return dynConf; - } - - @Override - public Partition getPartition() { - return partition; - } - - private DistributedLogManager openLog(String name) throws IOException { - Optional<DistributedLogConfiguration> dlConf = Optional.<DistributedLogConfiguration>absent(); - Optional<DynamicDistributedLogConfiguration> dynDlConf = Optional.of(dynConf); - Optional<StatsLogger> perStreamStatsLogger = Optional.of(streamLogger); - return dlNamespace.openLog(name, dlConf, dynDlConf, perStreamStatsLogger); - } - - // Expensive initialization, only called once per stream. - @Override - public void initialize() throws IOException { - manager = openLog(name); - - // Better to avoid registering the gauge multiple times, so do this in init - // which only gets called once. - streamLogger.registerGauge("stream_status", this.streamStatusGauge); - - // Signal initialization is complete, should be last in this method. - status = StreamStatus.INITIALIZING; - } - - @Override - public String toString() { - return String.format("Stream:%s, %s, %s Status:%s", name, manager, writer, status); - } - - @Override - public void start() { - // acquire the stream - acquireStream().addEventListener(new FutureEventListener<Boolean>() { - @Override - public void onSuccess(Boolean success) { - if (!success) { - // failed to acquire the stream. set the stream in error status and close it. - setStreamInErrorStatus(); - requestClose("Failed to acquire the ownership"); - } - } - - @Override - public void onFailure(Throwable cause) { - // unhandled exceptions - logger.error("Stream {} threw unhandled exception : ", name, cause); - // failed to acquire the stream. set the stream in error status and close it. - setStreamInErrorStatus(); - requestClose("Unhandled exception"); - } - }); - } - - // - // Stats Operations - // - - void countException(Throwable t, StatsLogger streamExceptionLogger) { - String exceptionName = null == t ? "null" : t.getClass().getName(); - Counter counter = exceptionCounters.get(exceptionName); - if (null == counter) { - counter = exceptionStatLogger.getCounter(exceptionName); - Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter); - if (null != oldCounter) { - counter = oldCounter; - } - } - counter.inc(); - streamExceptionLogger.getCounter(exceptionName).inc(); - } - - boolean isCriticalException(Throwable cause) { - return !(cause instanceof OwnershipAcquireFailedException); - } - - // - // Service Timeout: - // - schedule a timeout function to handle operation timeouts: {@link #handleServiceTimeout(String)} - // - if the operation is completed within timeout period, cancel the timeout. - // - - void scheduleTimeout(final StreamOp op) { - final Timeout timeout = requestTimer.newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - if (!timeout.isCancelled()) { - serviceTimeout.inc(); - handleServiceTimeout("Operation " + op.getClass().getName() + " timeout"); - } - } - }, serviceTimeoutMs, TimeUnit.MILLISECONDS); - op.responseHeader().ensure(new Function0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - timeout.cancel(); - return null; - } - }); - } - - /** - * Close the stream and schedule cache eviction at some point in the future. - * We delay this as a way to place the stream in a probationary state--cached - * in the proxy but unusable. - * This mechanism helps the cluster adapt to situations where a proxy has - * persistent connectivity/availability issues, because it keeps an affected - * stream off the proxy for a period of time, hopefully long enough for the - * issues to be resolved, or for whoop to kick in and kill the shard. - */ - void handleServiceTimeout(String reason) { - synchronized (this) { - if (StreamStatus.isUnavailable(status)) { - return; - } - // Mark stream in error state - setStreamInErrorStatus(); - } - - // Async close request, and schedule eviction when its done. - Future<Void> closeFuture = requestClose(reason, false /* dont remove */); - closeFuture.onSuccess(new AbstractFunction1<Void, BoxedUnit>() { - @Override - public BoxedUnit apply(Void result) { - streamManager.scheduleRemoval(StreamImpl.this, streamProbationTimeoutMs); - return BoxedUnit.UNIT; - } - }); - } - - // - // Submit the operation to the stream. - // - - /** - * Execute the StreamOp. If reacquire is needed, this may initiate reacquire and queue the op for - * execution once complete. - * - * @param op - * stream operation to execute. - */ - @Override - public void submit(StreamOp op) { - try { - limiter.apply(op); - } catch (OverCapacityException ex) { - op.fail(ex); - return; - } - - // Timeout stream op if requested. - if (serviceTimeoutMs > 0) { - scheduleTimeout(op); - } - - boolean completeOpNow = false; - boolean success = true; - if (StreamStatus.isUnavailable(status)) { - // Stream is closed, fail the op immediately - op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); - return; - } else if (StreamStatus.INITIALIZED == status && writer != null) { - completeOpNow = true; - success = true; - } else { - synchronized (this) { - if (StreamStatus.isUnavailable(status)) { - // Stream is closed, fail the op immediately - op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); - return; - } else if (StreamStatus.INITIALIZED == status) { - completeOpNow = true; - success = true; - } else if (failFastOnStreamNotReady) { - op.fail(new StreamNotReadyException("Stream " + name + " is not ready; status = " + status)); - return; - } else { // the stream is still initializing - pendingOps.add(op); - pendingOpsCounter.inc(); - if (1 == pendingOps.size()) { - if (op instanceof HeartbeatOp) { - ((HeartbeatOp) op).setWriteControlRecord(true); - } - } - } - } - } - if (completeOpNow) { - executeOp(op, success); - } - } - - // - // Execute operations and handle exceptions on operations - // - - /** - * Execute the <i>op</i> immediately. - * - * @param op - * stream operation to execute. - * @param success - * whether the operation is success or not. - */ - void executeOp(final StreamOp op, boolean success) { - final AsyncLogWriter writer; - final Throwable lastException; - synchronized (this) { - writer = this.writer; - lastException = this.lastException; - } - if (null != writer && success) { - op.execute(writer, sequencer, txnLock) - .addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - // nop - } - @Override - public void onFailure(Throwable cause) { - boolean countAsException = true; - if (cause instanceof DLException) { - final DLException dle = (DLException) cause; - switch (dle.getCode()) { - case FOUND: - assert(cause instanceof OwnershipAcquireFailedException); - countAsException = false; - handleExceptionOnStreamOp(op, cause); - break; - case ALREADY_CLOSED: - assert(cause instanceof AlreadyClosedException); - op.fail(cause); - handleAlreadyClosedException((AlreadyClosedException) cause); - break; - // exceptions that mostly from client (e.g. too large record) - case NOT_IMPLEMENTED: - case METADATA_EXCEPTION: - case LOG_EMPTY: - case LOG_NOT_FOUND: - case TRUNCATED_TRANSACTION: - case END_OF_STREAM: - case TRANSACTION_OUT_OF_ORDER: - case INVALID_STREAM_NAME: - case TOO_LARGE_RECORD: - case STREAM_NOT_READY: - case OVER_CAPACITY: - op.fail(cause); - break; - // the DL writer hits exception, simple set the stream to error status - // and fail the request - default: - handleExceptionOnStreamOp(op, cause); - break; - } - } else { - handleExceptionOnStreamOp(op, cause); - } - if (countAsException) { - countException(cause, streamExceptionStatLogger); - } - } - }); - } else { - if (null != lastException) { - op.fail(lastException); - } else { - op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); - } - } - } - - /** - * Handle exception when executing <i>op</i>. - * - * @param op - * stream operation executing - * @param cause - * exception received when executing <i>op</i> - */ - private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) { - AsyncLogWriter oldWriter = null; - boolean statusChanged = false; - synchronized (this) { - if (StreamStatus.INITIALIZED == status) { - oldWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, cause); - statusChanged = true; - } - } - if (statusChanged) { - Abortables.asyncAbort(oldWriter, false); - if (isCriticalException(cause)) { - logger.error("Failed to write data into stream {} : ", name, cause); - } else { - logger.warn("Failed to write data into stream {} : {}", name, cause.getMessage()); - } - requestClose("Failed to write data into stream " + name + " : " + cause.getMessage()); - } - op.fail(cause); - } - - /** - * Handling already closed exception. - */ - private void handleAlreadyClosedException(AlreadyClosedException ace) { - unexpectedExceptions.inc(); - logger.error("Encountered unexpected exception when writing data into stream {} : ", name, ace); - fatalErrorHandler.notifyFatalError(); - } - - // - // Acquire streams - // - - Future<Boolean> acquireStream() { - final Stopwatch stopwatch = Stopwatch.createStarted(); - final Promise<Boolean> acquirePromise = new Promise<Boolean>(); - manager.openAsyncLogWriter().addEventListener( - FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() { - - @Override - public void onSuccess(AsyncLogWriter w) { - onAcquireStreamSuccess(w, stopwatch, acquirePromise); - } - - @Override - public void onFailure(Throwable cause) { - onAcquireStreamFailure(cause, stopwatch, acquirePromise); - } - - }, scheduler, getStreamName())); - return acquirePromise; - } - - private void onAcquireStreamSuccess(AsyncLogWriter w, - Stopwatch stopwatch, - Promise<Boolean> acquirePromise) { - synchronized (txnLock) { - sequencer.setLastId(w.getLastTxId()); - } - AsyncLogWriter oldWriter; - Queue<StreamOp> oldPendingOps; - boolean success; - synchronized (StreamImpl.this) { - oldWriter = setStreamStatus(StreamStatus.INITIALIZED, - StreamStatus.INITIALIZING, w, null); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - success = true; - } - // check if the stream is allowed to be acquired - if (!streamManager.allowAcquire(StreamImpl.this)) { - if (null != oldWriter) { - Abortables.asyncAbort(oldWriter, true); - } - int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy(); - StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream() - + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions"); - countException(sue, exceptionStatLogger); - logger.error("Failed to acquire stream {} because it is unavailable : {}", - name, sue.getMessage()); - synchronized (this) { - oldWriter = setStreamStatus(StreamStatus.ERROR, - StreamStatus.INITIALIZED, null, sue); - // we don't switch the pending ops since they are already switched - // when setting the status to initialized - success = false; - } - } - processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise); - } - - private void onAcquireStreamFailure(Throwable cause, - Stopwatch stopwatch, - Promise<Boolean> acquirePromise) { - AsyncLogWriter oldWriter; - Queue<StreamOp> oldPendingOps; - boolean success; - if (cause instanceof AlreadyClosedException) { - countException(cause, streamExceptionStatLogger); - handleAlreadyClosedException((AlreadyClosedException) cause); - return; - } else { - if (isCriticalException(cause)) { - countException(cause, streamExceptionStatLogger); - logger.error("Failed to acquire stream {} : ", name, cause); - } else { - logger.warn("Failed to acquire stream {} : {}", name, cause.getMessage()); - } - synchronized (StreamImpl.this) { - oldWriter = setStreamStatus(StreamStatus.ERROR, - StreamStatus.INITIALIZING, null, cause); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - success = false; - } - } - processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise); - } - - /** - * Process the pending request after acquired stream. - * - * @param success whether the acquisition succeed or not - * @param oldWriter the old writer to abort - * @param oldPendingOps the old pending ops to execute - * @param stopwatch stopwatch to measure the time spent on acquisition - * @param acquirePromise the promise to complete the acquire operation - */ - void processPendingRequestsAfterAcquire(boolean success, - AsyncLogWriter oldWriter, - Queue<StreamOp> oldPendingOps, - Stopwatch stopwatch, - Promise<Boolean> acquirePromise) { - if (success) { - streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } else { - streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - for (StreamOp op : oldPendingOps) { - executeOp(op, success); - pendingOpsCounter.dec(); - } - Abortables.asyncAbort(oldWriter, true); - FutureUtils.setValue(acquirePromise, success); - } - - // - // Stream Status Changes - // - - synchronized void setStreamInErrorStatus() { - if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) { - return; - } - this.status = StreamStatus.ERROR; - } - - /** - * Update the stream status. The changes are only applied when there isn't status changed. - * - * @param newStatus - * new status - * @param oldStatus - * old status - * @param writer - * new log writer - * @param t - * new exception - * @return old writer if it exists - */ - synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus, - StreamStatus oldStatus, - AsyncLogWriter writer, - Throwable t) { - if (oldStatus != this.status) { - logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}", - new Object[] { name, oldStatus, this.status, newStatus }); - return null; - } - - String owner = null; - if (t instanceof OwnershipAcquireFailedException) { - owner = ((OwnershipAcquireFailedException) t).getCurrentOwner(); - } - - AsyncLogWriter oldWriter = this.writer; - this.writer = writer; - if (null != owner && owner.equals(clientId)) { - unexpectedExceptions.inc(); - logger.error("I am waiting myself {} to release lock on stream {}, so have to shut myself down :", - new Object[] { owner, name, t }); - // I lost the ownership but left a lock over zookeeper - // I should not ask client to redirect to myself again as I can't handle it :( - // shutdown myself - fatalErrorHandler.notifyFatalError(); - this.owner = null; - } else { - this.owner = owner; - } - this.lastException = t; - this.status = newStatus; - if (StreamStatus.INITIALIZED == newStatus) { - streamManager.notifyAcquired(this); - logger.info("Inserted acquired stream {} -> writer {}", name, this); - } else { - streamManager.notifyReleased(this); - logger.info("Removed acquired stream {} -> writer {}", name, this); - } - return oldWriter; - } - - // - // Stream Close Functions - // - - void close(DistributedLogManager dlm) { - if (null != dlm) { - try { - dlm.close(); - } catch (IOException ioe) { - logger.warn("Failed to close dlm for {} : ", name, ioe); - } - } - } - - @Override - public Future<Void> requestClose(String reason) { - return requestClose(reason, true); - } - - Future<Void> requestClose(String reason, boolean uncache) { - final boolean abort; - closeLock.writeLock().lock(); - try { - if (StreamStatus.CLOSING == status - || StreamStatus.CLOSED == status) { - return closePromise; - } - logger.info("Request to close stream {} : {}", getStreamName(), reason); - // if the stream isn't closed from INITIALIZED state, we abort the stream instead of closing it. - abort = StreamStatus.INITIALIZED != status; - status = StreamStatus.CLOSING; - streamManager.notifyReleased(this); - } finally { - closeLock.writeLock().unlock(); - } - // we will fail the requests that are coming in between closing and closed only - // after the async writer is closed. so we could clear up the lock before redirect - // them. - close(abort, uncache); - return closePromise; - } - - @Override - public void delete() throws IOException { - if (null != writer) { - Utils.close(writer); - synchronized (this) { - writer = null; - lastException = new StreamUnavailableException("Stream was deleted"); - } - } - if (null == manager) { - throw new UnexpectedException("No stream " + name + " to delete"); - } - manager.delete(); - } - - /** - * Post action executed after closing. - */ - private void postClose(boolean uncache) { - closeManagerAndErrorOutPendingRequests(); - unregisterGauge(); - if (uncache) { - if (null != owner) { - long probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3; - streamManager.scheduleRemoval(this, probationTimeoutMs); - } else { - streamManager.notifyRemoved(this); - logger.info("Removed cached stream {}.", getStreamName()); - } - } - FutureUtils.setValue(closePromise, null); - } - - /** - * Shouldn't call close directly. The callers should call #requestClose instead - * - * @param shouldAbort shall we abort the stream instead of closing - */ - private Future<Void> close(boolean shouldAbort, final boolean uncache) { - boolean abort; - closeLock.writeLock().lock(); - try { - if (StreamStatus.CLOSED == status) { - return closePromise; - } - abort = shouldAbort || (StreamStatus.INITIALIZED != status && StreamStatus.CLOSING != status); - status = StreamStatus.CLOSED; - streamManager.notifyReleased(this); - } finally { - closeLock.writeLock().unlock(); - } - logger.info("Closing stream {} ...", name); - // Close the writers to release the locks before failing the requests - Future<Void> closeWriterFuture; - if (abort) { - closeWriterFuture = Abortables.asyncAbort(writer, true); - } else { - closeWriterFuture = Utils.asyncClose(writer, true); - } - // close the manager and error out pending requests after close writer - Duration closeWaitDuration; - if (writerCloseTimeoutMs <= 0) { - closeWaitDuration = Duration.Top(); - } else { - closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs); - } - - FutureUtils.stats( - closeWriterFuture, - writerCloseStatLogger, - Stopwatch.createStarted() - ).masked().within(futureTimer, closeWaitDuration) - .addEventListener(FutureUtils.OrderedFutureEventListener.of( - new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - postClose(uncache); - } - @Override - public void onFailure(Throwable cause) { - if (cause instanceof TimeoutException) { - writerCloseTimeoutCounter.inc(); - } - postClose(uncache); - } - }, scheduler, name)); - return closePromise; - } - - private void closeManagerAndErrorOutPendingRequests() { - close(manager); - // Failed the pending requests. - Queue<StreamOp> oldPendingOps; - synchronized (this) { - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - } - StreamUnavailableException closingException = - new StreamUnavailableException("Stream " + name + " is closed."); - for (StreamOp op : oldPendingOps) { - op.fail(closingException); - pendingOpsCounter.dec(); - } - limiter.close(); - logger.info("Closed stream {}.", name); - } - - /** - * clean up the gauge to help GC. - */ - private void unregisterGauge(){ - streamLogger.unregisterGauge("stream_status", this.streamStatusGauge); - } - - // Test-only apis - - @VisibleForTesting - public int numPendingOps() { - Queue<StreamOp> queue = pendingOps; - return null == queue ? 0 : queue.size(); - } - - @VisibleForTesting - public StreamStatus getStatus() { - return status; - } - - @VisibleForTesting - public void setStatus(StreamStatus status) { - this.status = status; - } - - @VisibleForTesting - public AsyncLogWriter getWriter() { - return writer; - } - - @VisibleForTesting - public DistributedLogManager getManager() { - return manager; - } - - @VisibleForTesting - public Throwable getLastException() { - return lastException; - } - - @VisibleForTesting - public Future<Void> getCloseFuture() { - return closePromise; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java deleted file mode 100644 index d86c538..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import com.google.common.base.Optional; -import com.twitter.util.Future; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Manage lifecycle of streams. - * - * <p>StreamManager is responsible for creating, destroying, and keeping track of Stream objects. - * - * <p>Stream objects, which are managed by StreamManager and created by StreamFactory, are essentially the - * per stream request handlers, responsible fo dispatching ex. write requests to an underlying AsyncLogWriter, - * managing stream lock, interpreting exceptions, error conditions, and etc. - */ -public interface StreamManager { - - /** - * Get a cached stream, returning null if it doesnt exist. - * @param stream name - * @return the cached stream - */ - Stream getStream(String stream); - - /** - * Get a cached stream and create a new one if it doesnt exist. - * @param streamName stream name - * @param start whether to start the stream after it is created. - * @return future satisfied once close complete - */ - Stream getOrCreateStream(String streamName, boolean start) throws IOException; - - /** - * Asynchronously create a new stream. - * @param stream - * @return Future satisfied once the stream is created - */ - Future<Void> createStreamAsync(String stream); - - /** - * Is acquiring stream allowed? - * - * @param stream - * stream instance - * @return true if it is allowed to acquire this stream, otherwise false. - */ - boolean allowAcquire(Stream stream); - - /** - * Notify the manager that a stream was acquired. - * @param stream being acquired - */ - void notifyAcquired(Stream stream); - - /** - * Notify the manager that a stream was released. - * @param stream being released - */ - void notifyReleased(Stream stream); - - /** - * Notify the manager that a stream was completely removed. - * @param stream being uncached - * @return whether the stream existed or not - */ - boolean notifyRemoved(Stream stream); - - /** - * Asynchronous delete method. - * @param streamName stream name - * @return future satisfied once delete complete - */ - Future<Void> deleteAndRemoveAsync(String streamName); - - /** - * Asynchronous close and uncache method. - * @param streamName stream name - * @return future satisfied once close and uncache complete - */ - Future<Void> closeAndRemoveAsync(String streamName); - - /** - * Close and uncache after delayMs. - * @param stream to remove - */ - void scheduleRemoval(Stream stream, long delayMs); - - /** - * Close all stream. - * @return future satisfied all streams closed - */ - Future<List<Void>> closeStreams(); - - /** - * Return map with stream ownership info. - * @param regex for filtering streams - * @return map containing ownership info - */ - Map<String, String> getStreamOwnershipMap(Optional<String> regex); - - /** - * Number of acquired streams. - * @return number of acquired streams - */ - int numAcquired(); - - /** - * Number of cached streams. - * @return number of cached streams - */ - int numCached(); - - /** - * Is the stream denoted by streamName in the acquired state. - * @return true if the stream is in the acquired state - */ - boolean isAcquired(String streamName); - - /** - * Close manager and disallow further activity. - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java deleted file mode 100644 index 5d54738..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java +++ /dev/null @@ -1,413 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.RateLimiter; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.exceptions.ServiceUnavailableException; -import org.apache.distributedlog.exceptions.StreamUnavailableException; -import org.apache.distributedlog.exceptions.UnexpectedException; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.service.config.StreamConfigProvider; -import org.apache.distributedlog.service.streamset.Partition; -import org.apache.distributedlog.service.streamset.PartitionMap; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import org.apache.distributedlog.util.ConfUtils; -import com.twitter.util.Future; -import com.twitter.util.Promise; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * StreamManagerImpl is the default implementation responsible for creating, destroying, and keeping track - * of Streams. - * - * <p>StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating - * a stream object in isolation from the rest of the system. We pass a StreamFactory in instead of simply - * creating StreamImpl's ourselves in order to inject dependencies without bloating the StreamManagerImpl - * constructor. - */ -public class StreamManagerImpl implements StreamManager { - - private static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class); - - private final ConcurrentHashMap<String, Stream> streams = - new ConcurrentHashMap<String, Stream>(); - private final AtomicInteger numCached = new AtomicInteger(0); - - private final ConcurrentHashMap<String, Stream> acquiredStreams = - new ConcurrentHashMap<String, Stream>(); - private final AtomicInteger numAcquired = new AtomicInteger(0); - - // - // Partitions - // - private final StreamPartitionConverter partitionConverter; - private final PartitionMap cachedPartitions = new PartitionMap(); - private final PartitionMap acquiredPartitions = new PartitionMap(); - - final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock(); - private final ScheduledExecutorService executorService; - private final DistributedLogConfiguration dlConfig; - private final StreamConfigProvider streamConfigProvider; - private final String clientId; - private boolean closed = false; - private final StreamFactory streamFactory; - private final DistributedLogNamespace dlNamespace; - - public StreamManagerImpl(String clientId, - DistributedLogConfiguration dlConfig, - ScheduledExecutorService executorService, - StreamFactory streamFactory, - StreamPartitionConverter partitionConverter, - StreamConfigProvider streamConfigProvider, - DistributedLogNamespace dlNamespace) { - this.clientId = clientId; - this.executorService = executorService; - this.streamFactory = streamFactory; - this.partitionConverter = partitionConverter; - this.dlConfig = dlConfig; - this.streamConfigProvider = streamConfigProvider; - this.dlNamespace = dlNamespace; - } - - private DynamicDistributedLogConfiguration getDynConf(String streamName) { - Optional<DynamicDistributedLogConfiguration> dynDlConf = - streamConfigProvider.getDynamicStreamConfig(streamName); - if (dynDlConf.isPresent()) { - return dynDlConf.get(); - } else { - return ConfUtils.getConstDynConf(dlConfig); - } - } - - @Override - public boolean allowAcquire(Stream stream) { - return acquiredPartitions.addPartition( - stream.getPartition(), - stream.getStreamConfiguration().getMaxAcquiredPartitionsPerProxy()); - } - - /** - * Must be enqueued to an executor to avoid deadlocks (close and execute-op both - * try to acquire the same read-write lock). - */ - @Override - public Future<Void> deleteAndRemoveAsync(final String stream) { - final Promise<Void> result = new Promise<Void>(); - java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() { - @Override - public void run() { - result.become(doDeleteAndRemoveAsync(stream)); - } - }, 0); - if (null == scheduleFuture) { - return Future.exception( - new ServiceUnavailableException("Couldn't schedule a delete task.")); - } - return result; - } - - /** - * Must be enqueued to an executor to avoid deadlocks (close and execute-op both - * try to acquire the same read-write lock). - */ - @Override - public Future<Void> closeAndRemoveAsync(final String streamName) { - final Promise<Void> releasePromise = new Promise<Void>(); - java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() { - @Override - public void run() { - releasePromise.become(doCloseAndRemoveAsync(streamName)); - } - }, 0); - if (null == scheduleFuture) { - return Future.exception( - new ServiceUnavailableException("Couldn't schedule a release task.")); - } - return releasePromise; - } - - @Override - public Future<Void> createStreamAsync(final String stream) { - final Promise<Void> createPromise = new Promise<Void>(); - java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() { - @Override - public void run() { - try { - dlNamespace.createLog(stream); - createPromise.setValue(null); - } catch (Exception e) { - createPromise.setException(e); - } - } - }, 0); - if (null == scheduleFuture) { - return Future.exception( - new ServiceUnavailableException("Couldn't schedule a create task.")); - } - return createPromise; - } - - @Override - public void notifyReleased(Stream stream) { - acquiredPartitions.removePartition(stream.getPartition()); - if (acquiredStreams.remove(stream.getStreamName(), stream)) { - numAcquired.getAndDecrement(); - } - } - - @Override - public void notifyAcquired(Stream stream) { - if (null == acquiredStreams.put(stream.getStreamName(), stream)) { - numAcquired.getAndIncrement(); - } - } - - @Override - public boolean notifyRemoved(Stream stream) { - cachedPartitions.removePartition(stream.getPartition()); - if (streams.remove(stream.getStreamName(), stream)) { - numCached.getAndDecrement(); - return true; - } - return false; - } - - @Override - public Map<String, String> getStreamOwnershipMap(Optional<String> regex) { - Map<String, String> ownershipMap = new HashMap<String, String>(); - for (Map.Entry<String, Stream> entry : acquiredStreams.entrySet()) { - String name = entry.getKey(); - if (regex.isPresent() && !name.matches(regex.get())) { - continue; - } - Stream stream = entry.getValue(); - if (null == stream) { - continue; - } - String owner = stream.getOwner(); - if (null == owner) { - ownershipMap.put(name, clientId); - } - } - return ownershipMap; - } - - @Override - public Stream getStream(String stream) { - return streams.get(stream); - } - - @Override - public Stream getOrCreateStream(String streamName, boolean start) throws IOException { - Stream stream = streams.get(streamName); - if (null == stream) { - closeLock.readLock().lock(); - try { - if (closed) { - return null; - } - DynamicDistributedLogConfiguration dynConf = getDynConf(streamName); - int maxCachedPartitions = dynConf.getMaxCachedPartitionsPerProxy(); - - // get partition from the stream name - Partition partition = partitionConverter.convert(streamName); - - // add partition to cached map - if (!cachedPartitions.addPartition(partition, maxCachedPartitions)) { - throw new StreamUnavailableException("Stream " + streamName - + " is not allowed to cache more than " + maxCachedPartitions + " partitions"); - } - - stream = newStream(streamName, dynConf); - Stream oldWriter = streams.putIfAbsent(streamName, stream); - if (null != oldWriter) { - stream = oldWriter; - } else { - numCached.getAndIncrement(); - logger.info("Inserted mapping stream name {} -> stream {}", streamName, stream); - stream.initialize(); - if (start) { - stream.start(); - } - } - } finally { - closeLock.readLock().unlock(); - } - } - return stream; - } - - @Override - public Future<List<Void>> closeStreams() { - int numAcquired = acquiredStreams.size(); - int numCached = streams.size(); - logger.info("Closing all acquired streams : acquired = {}, cached = {}.", - numAcquired, numCached); - Set<Stream> streamsToClose = new HashSet<Stream>(); - streamsToClose.addAll(streams.values()); - return closeStreams(streamsToClose, Optional.<RateLimiter>absent()); - } - - @Override - public void scheduleRemoval(final Stream stream, long delayMs) { - if (delayMs > 0) { - logger.info("Scheduling removal of stream {} from cache after {} sec.", - stream.getStreamName(), delayMs); - } - schedule(new Runnable() { - @Override - public void run() { - if (notifyRemoved(stream)) { - logger.info("Removed cached stream {} after probation.", stream.getStreamName()); - } else { - logger.info("Cached stream {} already removed.", stream.getStreamName()); - } - } - }, delayMs); - } - - @Override - public int numAcquired() { - return numAcquired.get(); - } - - @Override - public int numCached() { - return numCached.get(); - } - - @Override - public boolean isAcquired(String streamName) { - return acquiredStreams.containsKey(streamName); - } - - @Override - public void close() { - closeLock.writeLock().lock(); - try { - if (closed) { - return; - } - closed = true; - } finally { - closeLock.writeLock().unlock(); - } - } - - private Future<List<Void>> closeStreams(Set<Stream> streamsToClose, Optional<RateLimiter> rateLimiter) { - if (streamsToClose.isEmpty()) { - logger.info("No streams to close."); - List<Void> emptyList = new ArrayList<Void>(); - return Future.value(emptyList); - } - List<Future<Void>> futures = new ArrayList<Future<Void>>(streamsToClose.size()); - for (Stream stream : streamsToClose) { - if (rateLimiter.isPresent()) { - rateLimiter.get().acquire(); - } - futures.add(stream.requestClose("Close Streams")); - } - return Future.collect(futures); - } - - private Stream newStream(String name, DynamicDistributedLogConfiguration streamConf) { - return streamFactory.create(name, streamConf, this); - } - - public Future<Void> doCloseAndRemoveAsync(final String streamName) { - Stream stream = streams.get(streamName); - if (null == stream) { - logger.info("No stream {} to release.", streamName); - return Future.value(null); - } else { - return stream.requestClose("release ownership"); - } - } - - /** - * Dont schedule if we're closed - closeLock is acquired to close, so if we acquire the - * lock and discover we're not closed, we won't schedule. - */ - private java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) { - closeLock.readLock().lock(); - try { - if (closed) { - return null; - } else if (delayMs > 0) { - return executorService.schedule(runnable, delayMs, TimeUnit.MILLISECONDS); - } else { - return executorService.submit(runnable); - } - } catch (RejectedExecutionException ree) { - logger.error("Failed to schedule task {} in {} ms : ", - new Object[] { runnable, delayMs, ree }); - return null; - } finally { - closeLock.readLock().unlock(); - } - } - - private Future<Void> doDeleteAndRemoveAsync(final String streamName) { - Stream stream = streams.get(streamName); - if (null == stream) { - logger.warn("No stream {} to delete.", streamName); - return Future.exception(new UnexpectedException("No stream " + streamName + " to delete.")); - } else { - Future<Void> result; - logger.info("Deleting stream {}, {}", streamName, stream); - try { - stream.delete(); - result = stream.requestClose("Stream Deleted"); - } catch (IOException e) { - logger.error("Failed on removing stream {} : ", streamName, e); - result = Future.exception(e); - } - return result; - } - } - - @VisibleForTesting - public ConcurrentHashMap<String, Stream> getCachedStreams() { - return streams; - } - - @VisibleForTesting - public ConcurrentHashMap<String, Stream> getAcquiredStreams() { - return acquiredStreams; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java deleted file mode 100644 index d0b8de4..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import com.google.common.base.Stopwatch; -import org.apache.distributedlog.AsyncLogWriter; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.thrift.service.ResponseHeader; -import org.apache.distributedlog.util.Sequencer; -import com.twitter.util.Future; - -/** - * An operation applied to a stream. - */ -public interface StreamOp { - /** - * Execute a stream op with the supplied writer. - * - * @param writer active writer for applying the change - * @param sequencer sequencer used for generating transaction id for stream operations - * @param txnLock transaction lock to guarantee ordering of transaction id - * @return a future satisfied when the operation completes execution - */ - Future<Void> execute(AsyncLogWriter writer, - Sequencer sequencer, - Object txnLock); - - /** - * Invoked before the stream op is executed. - */ - void preExecute() throws DLException; - - /** - * Return the response header (containing the status code etc.). - * - * @return A future containing the response header or the exception - * encountered by the op if it failed. - */ - Future<ResponseHeader> responseHeader(); - - /** - * Abort the operation with the givem exception. - */ - void fail(Throwable t); - - /** - * Return the stream name. - */ - String streamName(); - - /** - * Stopwatch gives the start time of the operation. - */ - Stopwatch stopwatch(); - - /** - * Compute checksum from arguments. - */ - Long computeChecksum(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java deleted file mode 100644 index f3fc610..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import org.apache.distributedlog.service.streamset.Partition; -import org.apache.distributedlog.stats.BroadCastStatsLogger; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * Encapsulate stream op stats construction to make it easier to access stream - * op stats consistently from different scopes. - */ -public class StreamOpStats { - private final StatsLogger baseStatsLogger; - private final StatsLogger requestStatsLogger; - private final StatsLogger recordsStatsLogger; - private final StatsLogger requestDeniedStatsLogger; - private final StatsLogger streamStatsLogger; - - public StreamOpStats(StatsLogger statsLogger, - StatsLogger perStreamStatsLogger) { - this.baseStatsLogger = statsLogger; - this.requestStatsLogger = statsLogger.scope("request"); - this.recordsStatsLogger = statsLogger.scope("records"); - this.requestDeniedStatsLogger = statsLogger.scope("denied"); - this.streamStatsLogger = perStreamStatsLogger; - } - - public StatsLogger baseStatsLogger(String opName) { - return baseStatsLogger; - } - - public Counter baseCounter(String opName) { - return baseStatsLogger.getCounter(opName); - } - - public StatsLogger baseScope(String opName) { - return baseStatsLogger.scope(opName); - } - - public OpStatsLogger requestLatencyStat(String opName) { - return requestStatsLogger.getOpStatsLogger(opName); - } - - public StatsLogger requestScope(String scopeName) { - return requestStatsLogger.scope(scopeName); - } - - public Counter scopedRequestCounter(String opName, String counterName) { - return requestScope(opName).getCounter(counterName); - } - - public Counter requestCounter(String counterName) { - return requestStatsLogger.getCounter(counterName); - } - - public Counter requestPendingCounter(String counterName) { - return requestCounter(counterName); - } - - public Counter requestDeniedCounter(String counterName) { - return requestDeniedStatsLogger.getCounter(counterName); - } - - public Counter recordsCounter(String counterName) { - return recordsStatsLogger.getCounter(counterName); - } - - public StatsLogger streamRequestStatsLogger(Partition partition) { - return BroadCastStatsLogger.masterslave( - streamStatsLogger.scope(partition.getStream()).scope("partition") - .scope(partition.getPaddedId()), streamStatsLogger.scope(partition.getStream()) - .scope("aggregate")); - } - - public StatsLogger streamRequestScope(Partition partition, String scopeName) { - return streamRequestStatsLogger(partition).scope(scopeName); - } - - public OpStatsLogger streamRequestLatencyStat(Partition partition, String opName) { - return streamRequestStatsLogger(partition).getOpStatsLogger(opName); - } - - public Counter streamRequestCounter(Partition partition, String opName, String counterName) { - return streamRequestScope(partition, opName).getCounter(counterName); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java deleted file mode 100644 index 7a38d14..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import org.apache.distributedlog.AsyncLogWriter; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.acl.AccessControlManager; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.RequestDeniedException; -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.ProtocolUtils; -import org.apache.distributedlog.util.Sequencer; -import com.twitter.util.Future; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; - -/** - * Operation to truncate a log stream. - */ -public class TruncateOp extends AbstractWriteOp { - - private static final Logger logger = LoggerFactory.getLogger(TruncateOp.class); - - private final Counter deniedTruncateCounter; - private final DLSN dlsn; - private final AccessControlManager accessControlManager; - - public TruncateOp(String stream, - DLSN dlsn, - StatsLogger statsLogger, - StatsLogger perStreamStatsLogger, - Long checksum, - Feature checksumDisabledFeature, - AccessControlManager accessControlManager) { - super(stream, requestStat(statsLogger, "truncate"), checksum, checksumDisabledFeature); - StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); - this.deniedTruncateCounter = streamOpStats.requestDeniedCounter("truncate"); - this.accessControlManager = accessControlManager; - this.dlsn = dlsn; - } - - @Override - public Long computeChecksum() { - return ProtocolUtils.truncateOpCRC32(stream, dlsn); - } - - @Override - protected Future<WriteResponse> executeOp(AsyncLogWriter writer, - Sequencer sequencer, - Object txnLock) { - if (!stream.equals(writer.getStreamName())) { - logger.error("Truncate: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName()); - return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request")); - } - return writer.truncate(dlsn).map(new AbstractFunction1<Boolean, WriteResponse>() { - @Override - public WriteResponse apply(Boolean v1) { - return ResponseUtils.writeSuccess(); - } - }); - } - - @Override - public void preExecute() throws DLException { - if (!accessControlManager.allowTruncate(stream)) { - deniedTruncateCounter.inc(); - throw new RequestDeniedException(stream, "truncate"); - } - super.preExecute(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java deleted file mode 100644 index c4bdcc2..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import org.apache.distributedlog.AsyncLogWriter; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.LogRecord; -import org.apache.distributedlog.acl.AccessControlManager; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.RequestDeniedException; -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.service.config.ServerConfiguration; -import org.apache.distributedlog.service.streamset.Partition; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import org.apache.distributedlog.thrift.service.ResponseHeader; -import org.apache.distributedlog.thrift.service.StatusCode; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.ProtocolUtils; -import org.apache.distributedlog.util.Sequencer; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; - -/** - * Operation to write a single record to a log stream. - */ -public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload { - - private static final Logger logger = LoggerFactory.getLogger(WriteOp.class); - - private final byte[] payload; - private final boolean isRecordSet; - - // Stats - private final Counter deniedWriteCounter; - private final Counter successRecordCounter; - private final Counter failureRecordCounter; - private final Counter redirectRecordCounter; - private final OpStatsLogger latencyStat; - private final Counter bytes; - private final Counter writeBytes; - - private final byte dlsnVersion; - private final AccessControlManager accessControlManager; - - public WriteOp(String stream, - ByteBuffer data, - StatsLogger statsLogger, - StatsLogger perStreamStatsLogger, - StreamPartitionConverter streamPartitionConverter, - ServerConfiguration conf, - byte dlsnVersion, - Long checksum, - boolean isRecordSet, - Feature checksumDisabledFeature, - AccessControlManager accessControlManager) { - super(stream, requestStat(statsLogger, "write"), checksum, checksumDisabledFeature); - payload = new byte[data.remaining()]; - data.get(payload); - this.isRecordSet = isRecordSet; - - final Partition partition = streamPartitionConverter.convert(stream); - StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); - this.successRecordCounter = streamOpStats.recordsCounter("success"); - this.failureRecordCounter = streamOpStats.recordsCounter("failure"); - this.redirectRecordCounter = streamOpStats.recordsCounter("redirect"); - this.deniedWriteCounter = streamOpStats.requestDeniedCounter("write"); - this.writeBytes = streamOpStats.scopedRequestCounter("write", "bytes"); - this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "write"); - this.bytes = streamOpStats.streamRequestCounter(partition, "write", "bytes"); - - this.dlsnVersion = dlsnVersion; - this.accessControlManager = accessControlManager; - - final long size = getPayloadSize(); - result().addEventListener(new FutureEventListener<WriteResponse>() { - @Override - public void onSuccess(WriteResponse response) { - if (response.getHeader().getCode() == StatusCode.SUCCESS) { - latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); - bytes.add(size); - writeBytes.add(size); - } else { - latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); - } - } - @Override - public void onFailure(Throwable cause) { - latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); - } - }); - } - - @Override - public long getPayloadSize() { - return payload.length; - } - - @Override - public Long computeChecksum() { - return ProtocolUtils.writeOpCRC32(stream, payload); - } - - @Override - public void preExecute() throws DLException { - if (!accessControlManager.allowWrite(stream)) { - deniedWriteCounter.inc(); - throw new RequestDeniedException(stream, "write"); - } - super.preExecute(); - } - - @Override - protected Future<WriteResponse> executeOp(AsyncLogWriter writer, - Sequencer sequencer, - Object txnLock) { - if (!stream.equals(writer.getStreamName())) { - logger.error("Write: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName()); - return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request")); - } - - long txnId; - Future<DLSN> writeResult; - synchronized (txnLock) { - txnId = sequencer.nextId(); - LogRecord record = new LogRecord(txnId, payload); - if (isRecordSet) { - record.setRecordSet(); - } - writeResult = writer.write(record); - } - return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() { - @Override - public WriteResponse apply(DLSN value) { - successRecordCounter.inc(); - return ResponseUtils.writeSuccess().setDlsn(value.serialize(dlsnVersion)); - } - }); - } - - @Override - protected void fail(ResponseHeader header) { - if (StatusCode.FOUND == header.getCode()) { - redirectRecordCounter.inc(); - } else { - failureRecordCounter.inc(); - } - super.fail(header); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java deleted file mode 100644 index e411b420..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -/** - * A write operation with payload. - */ -public interface WriteOpWithPayload { - - // Return the payload size in bytes - long getPayloadSize(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java deleted file mode 100644 index fcaee35..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream.admin; - -import org.apache.distributedlog.exceptions.DLException; -import com.twitter.util.Future; - -/** - * Admin operation interface. - */ -public interface AdminOp<RespT> { - - /** - * Invoked before the stream op is executed. - */ - void preExecute() throws DLException; - - /** - * Execute the operation. - * - * @return the future represents the response of the operation - */ - Future<RespT> execute(); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java deleted file mode 100644 index 89a2566..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream.admin; - -import static org.apache.distributedlog.service.stream.AbstractStreamOp.requestStat; - -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.service.stream.StreamManager; -import org.apache.distributedlog.thrift.service.WriteResponse; -import com.twitter.util.Future; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.StatsLogger; -import scala.runtime.AbstractFunction1; - -/** - * Operation to create log stream. - */ -public class CreateOp extends StreamAdminOp { - - public CreateOp(String stream, - StatsLogger statsLogger, - StreamManager streamManager, - Long checksum, - Feature checksumEnabledFeature) { - super(stream, - streamManager, - requestStat(statsLogger, "create"), - checksum, - checksumEnabledFeature); - } - - @Override - protected Future<WriteResponse> executeOp() { - Future<Void> result = streamManager.createStreamAsync(stream); - return result.map(new AbstractFunction1<Void, WriteResponse>() { - @Override - public WriteResponse apply(Void value) { - return ResponseUtils.writeSuccess(); - } - }); - } -}