http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java new file mode 100644 index 0000000..c0c0972 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java @@ -0,0 +1,926 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.AlreadyClosedException; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.OverCapacityException; +import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; +import org.apache.distributedlog.exceptions.StatusCode; +import org.apache.distributedlog.exceptions.StreamNotReadyException; +import org.apache.distributedlog.exceptions.StreamUnavailableException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.io.Abortables; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.service.FatalErrorHandler; +import org.apache.distributedlog.service.ServerFeatureKeys; +import org.apache.distributedlog.service.config.ServerConfiguration; +import org.apache.distributedlog.service.config.StreamConfigProvider; +import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter; +import org.apache.distributedlog.service.streamset.Partition; +import org.apache.distributedlog.stats.BroadCastStatsLogger; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.TimeSequencer; +import org.apache.distributedlog.util.Utils; +import com.twitter.util.Duration; +import com.twitter.util.Function0; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import com.twitter.util.TimeoutException; +import com.twitter.util.Timer; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +/** + * Implementation of {@link Stream}. + */ +public class StreamImpl implements Stream { + + private static final Logger logger = LoggerFactory.getLogger(StreamImpl.class); + + /** + * The status of the stream. + * + * <p>The status change of the stream should just go in one direction. If a stream hits + * any error, the stream should be put in error state. If a stream is in error state, + * it should be removed and not reused anymore. + */ + public enum StreamStatus { + UNINITIALIZED(-1), + INITIALIZING(0), + INITIALIZED(1), + CLOSING(-4), + CLOSED(-5), + // if a stream is in error state, it should be abort during closing. + ERROR(-6); + + final int code; + + StreamStatus(int code) { + this.code = code; + } + + int getCode() { + return code; + } + + public static boolean isUnavailable(StreamStatus status) { + return StreamStatus.ERROR == status || StreamStatus.CLOSING == status || StreamStatus.CLOSED == status; + } + } + + private final String name; + private final Partition partition; + private DistributedLogManager manager; + + private volatile AsyncLogWriter writer; + private volatile StreamStatus status; + private volatile String owner; + private volatile Throwable lastException; + private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>(); + + private final Promise<Void> closePromise = new Promise<Void>(); + private final Object txnLock = new Object(); + private final TimeSequencer sequencer = new TimeSequencer(); + private final StreamRequestLimiter limiter; + private final DynamicDistributedLogConfiguration dynConf; + private final DistributedLogConfiguration dlConfig; + private final DistributedLogNamespace dlNamespace; + private final String clientId; + private final OrderedScheduler scheduler; + private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock(); + private final Feature featureRateLimitDisabled; + private final StreamManager streamManager; + private final StreamConfigProvider streamConfigProvider; + private final FatalErrorHandler fatalErrorHandler; + private final long streamProbationTimeoutMs; + private final long serviceTimeoutMs; + private final long writerCloseTimeoutMs; + private final boolean failFastOnStreamNotReady; + private final HashedWheelTimer requestTimer; + private final Timer futureTimer; + + // Stats + private final StatsLogger streamLogger; + private final StatsLogger streamExceptionStatLogger; + private final StatsLogger limiterStatLogger; + private final Counter serviceTimeout; + private final OpStatsLogger streamAcquireStat; + private final OpStatsLogger writerCloseStatLogger; + private final Counter pendingOpsCounter; + private final Counter unexpectedExceptions; + private final Counter writerCloseTimeoutCounter; + private final StatsLogger exceptionStatLogger; + private final ConcurrentHashMap<String, Counter> exceptionCounters = + new ConcurrentHashMap<String, Counter>(); + private final Gauge<Number> streamStatusGauge; + + // Since we may create and discard streams at initialization if there's a race, + // must not do any expensive initialization here (particularly any locking or + // significant resource allocation etc.). + StreamImpl(final String name, + final Partition partition, + String clientId, + StreamManager streamManager, + StreamOpStats streamOpStats, + ServerConfiguration serverConfig, + DistributedLogConfiguration dlConfig, + DynamicDistributedLogConfiguration streamConf, + FeatureProvider featureProvider, + StreamConfigProvider streamConfigProvider, + DistributedLogNamespace dlNamespace, + OrderedScheduler scheduler, + FatalErrorHandler fatalErrorHandler, + HashedWheelTimer requestTimer, + Timer futureTimer) { + this.clientId = clientId; + this.dlConfig = dlConfig; + this.streamManager = streamManager; + this.name = name; + this.partition = partition; + this.status = StreamStatus.UNINITIALIZED; + this.lastException = new IOException("Fail to write record to stream " + name); + this.streamConfigProvider = streamConfigProvider; + this.dlNamespace = dlNamespace; + this.featureRateLimitDisabled = featureProvider.getFeature( + ServerFeatureKeys.SERVICE_RATE_LIMIT_DISABLED.name().toLowerCase()); + this.scheduler = scheduler; + this.serviceTimeoutMs = serverConfig.getServiceTimeoutMs(); + this.streamProbationTimeoutMs = serverConfig.getStreamProbationTimeoutMs(); + this.writerCloseTimeoutMs = serverConfig.getWriterCloseTimeoutMs(); + this.failFastOnStreamNotReady = dlConfig.getFailFastOnStreamNotReady(); + this.fatalErrorHandler = fatalErrorHandler; + this.dynConf = streamConf; + StatsLogger limiterStatsLogger = BroadCastStatsLogger.two( + streamOpStats.baseScope("stream_limiter"), + streamOpStats.streamRequestScope(partition, "limiter")); + this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled); + this.requestTimer = requestTimer; + this.futureTimer = futureTimer; + + // Stats + this.streamLogger = streamOpStats.streamRequestStatsLogger(partition); + this.limiterStatLogger = streamOpStats.baseScope("request_limiter"); + this.streamExceptionStatLogger = streamLogger.scope("exceptions"); + this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout"); + StatsLogger streamsStatsLogger = streamOpStats.baseScope("streams"); + this.streamAcquireStat = streamsStatsLogger.getOpStatsLogger("acquire"); + this.pendingOpsCounter = streamOpStats.baseCounter("pending_ops"); + this.unexpectedExceptions = streamOpStats.baseCounter("unexpected_exceptions"); + this.exceptionStatLogger = streamOpStats.requestScope("exceptions"); + this.writerCloseStatLogger = streamsStatsLogger.getOpStatsLogger("writer_close"); + this.writerCloseTimeoutCounter = streamsStatsLogger.getCounter("writer_close_timeouts"); + // Gauges + this.streamStatusGauge = new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return StreamStatus.UNINITIALIZED.getCode(); + } + @Override + public Number getSample() { + return status.getCode(); + } + }; + } + + @Override + public String getOwner() { + return owner; + } + + @Override + public String getStreamName() { + return name; + } + + @Override + public DynamicDistributedLogConfiguration getStreamConfiguration() { + return dynConf; + } + + @Override + public Partition getPartition() { + return partition; + } + + private DistributedLogManager openLog(String name) throws IOException { + Optional<DistributedLogConfiguration> dlConf = Optional.<DistributedLogConfiguration>absent(); + Optional<DynamicDistributedLogConfiguration> dynDlConf = Optional.of(dynConf); + Optional<StatsLogger> perStreamStatsLogger = Optional.of(streamLogger); + return dlNamespace.openLog(name, dlConf, dynDlConf, perStreamStatsLogger); + } + + // Expensive initialization, only called once per stream. + @Override + public void initialize() throws IOException { + manager = openLog(name); + + // Better to avoid registering the gauge multiple times, so do this in init + // which only gets called once. + streamLogger.registerGauge("stream_status", this.streamStatusGauge); + + // Signal initialization is complete, should be last in this method. + status = StreamStatus.INITIALIZING; + } + + @Override + public String toString() { + return String.format("Stream:%s, %s, %s Status:%s", name, manager, writer, status); + } + + @Override + public void start() { + // acquire the stream + acquireStream().addEventListener(new FutureEventListener<Boolean>() { + @Override + public void onSuccess(Boolean success) { + if (!success) { + // failed to acquire the stream. set the stream in error status and close it. + setStreamInErrorStatus(); + requestClose("Failed to acquire the ownership"); + } + } + + @Override + public void onFailure(Throwable cause) { + // unhandled exceptions + logger.error("Stream {} threw unhandled exception : ", name, cause); + // failed to acquire the stream. set the stream in error status and close it. + setStreamInErrorStatus(); + requestClose("Unhandled exception"); + } + }); + } + + // + // Stats Operations + // + + void countException(Throwable t, StatsLogger streamExceptionLogger) { + String exceptionName = null == t ? "null" : t.getClass().getName(); + Counter counter = exceptionCounters.get(exceptionName); + if (null == counter) { + counter = exceptionStatLogger.getCounter(exceptionName); + Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter); + if (null != oldCounter) { + counter = oldCounter; + } + } + counter.inc(); + streamExceptionLogger.getCounter(exceptionName).inc(); + } + + boolean isCriticalException(Throwable cause) { + return !(cause instanceof OwnershipAcquireFailedException); + } + + // + // Service Timeout: + // - schedule a timeout function to handle operation timeouts: {@link #handleServiceTimeout(String)} + // - if the operation is completed within timeout period, cancel the timeout. + // + + void scheduleTimeout(final StreamOp op) { + final Timeout timeout = requestTimer.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (!timeout.isCancelled()) { + serviceTimeout.inc(); + handleServiceTimeout("Operation " + op.getClass().getName() + " timeout"); + } + } + }, serviceTimeoutMs, TimeUnit.MILLISECONDS); + op.responseHeader().ensure(new Function0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + timeout.cancel(); + return null; + } + }); + } + + /** + * Close the stream and schedule cache eviction at some point in the future. + * We delay this as a way to place the stream in a probationary state--cached + * in the proxy but unusable. + * This mechanism helps the cluster adapt to situations where a proxy has + * persistent connectivity/availability issues, because it keeps an affected + * stream off the proxy for a period of time, hopefully long enough for the + * issues to be resolved, or for whoop to kick in and kill the shard. + */ + void handleServiceTimeout(String reason) { + synchronized (this) { + if (StreamStatus.isUnavailable(status)) { + return; + } + // Mark stream in error state + setStreamInErrorStatus(); + } + + // Async close request, and schedule eviction when its done. + Future<Void> closeFuture = requestClose(reason, false /* dont remove */); + closeFuture.onSuccess(new AbstractFunction1<Void, BoxedUnit>() { + @Override + public BoxedUnit apply(Void result) { + streamManager.scheduleRemoval(StreamImpl.this, streamProbationTimeoutMs); + return BoxedUnit.UNIT; + } + }); + } + + // + // Submit the operation to the stream. + // + + /** + * Execute the StreamOp. If reacquire is needed, this may initiate reacquire and queue the op for + * execution once complete. + * + * @param op + * stream operation to execute. + */ + @Override + public void submit(StreamOp op) { + try { + limiter.apply(op); + } catch (OverCapacityException ex) { + op.fail(ex); + return; + } + + // Timeout stream op if requested. + if (serviceTimeoutMs > 0) { + scheduleTimeout(op); + } + + boolean completeOpNow = false; + boolean success = true; + if (StreamStatus.isUnavailable(status)) { + // Stream is closed, fail the op immediately + op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); + return; + } else if (StreamStatus.INITIALIZED == status && writer != null) { + completeOpNow = true; + success = true; + } else { + synchronized (this) { + if (StreamStatus.isUnavailable(status)) { + // Stream is closed, fail the op immediately + op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); + return; + } else if (StreamStatus.INITIALIZED == status) { + completeOpNow = true; + success = true; + } else if (failFastOnStreamNotReady) { + op.fail(new StreamNotReadyException("Stream " + name + " is not ready; status = " + status)); + return; + } else { // the stream is still initializing + pendingOps.add(op); + pendingOpsCounter.inc(); + if (1 == pendingOps.size()) { + if (op instanceof HeartbeatOp) { + ((HeartbeatOp) op).setWriteControlRecord(true); + } + } + } + } + } + if (completeOpNow) { + executeOp(op, success); + } + } + + // + // Execute operations and handle exceptions on operations + // + + /** + * Execute the <i>op</i> immediately. + * + * @param op + * stream operation to execute. + * @param success + * whether the operation is success or not. + */ + void executeOp(final StreamOp op, boolean success) { + final AsyncLogWriter writer; + final Throwable lastException; + synchronized (this) { + writer = this.writer; + lastException = this.lastException; + } + if (null != writer && success) { + op.execute(writer, sequencer, txnLock) + .addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + // nop + } + @Override + public void onFailure(Throwable cause) { + boolean countAsException = true; + if (cause instanceof DLException) { + final DLException dle = (DLException) cause; + switch (dle.getCode()) { + case StatusCode.FOUND: + assert(cause instanceof OwnershipAcquireFailedException); + countAsException = false; + handleExceptionOnStreamOp(op, cause); + break; + case StatusCode.ALREADY_CLOSED: + assert(cause instanceof AlreadyClosedException); + op.fail(cause); + handleAlreadyClosedException((AlreadyClosedException) cause); + break; + // exceptions that mostly from client (e.g. too large record) + case StatusCode.NOT_IMPLEMENTED: + case StatusCode.METADATA_EXCEPTION: + case StatusCode.LOG_EMPTY: + case StatusCode.LOG_NOT_FOUND: + case StatusCode.TRUNCATED_TRANSACTION: + case StatusCode.END_OF_STREAM: + case StatusCode.TRANSACTION_OUT_OF_ORDER: + case StatusCode.INVALID_STREAM_NAME: + case StatusCode.TOO_LARGE_RECORD: + case StatusCode.STREAM_NOT_READY: + case StatusCode.OVER_CAPACITY: + op.fail(cause); + break; + // the DL writer hits exception, simple set the stream to error status + // and fail the request + default: + handleExceptionOnStreamOp(op, cause); + break; + } + } else { + handleExceptionOnStreamOp(op, cause); + } + if (countAsException) { + countException(cause, streamExceptionStatLogger); + } + } + }); + } else { + if (null != lastException) { + op.fail(lastException); + } else { + op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); + } + } + } + + /** + * Handle exception when executing <i>op</i>. + * + * @param op + * stream operation executing + * @param cause + * exception received when executing <i>op</i> + */ + private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) { + AsyncLogWriter oldWriter = null; + boolean statusChanged = false; + synchronized (this) { + if (StreamStatus.INITIALIZED == status) { + oldWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, cause); + statusChanged = true; + } + } + if (statusChanged) { + Abortables.asyncAbort(oldWriter, false); + if (isCriticalException(cause)) { + logger.error("Failed to write data into stream {} : ", name, cause); + } else { + logger.warn("Failed to write data into stream {} : {}", name, cause.getMessage()); + } + requestClose("Failed to write data into stream " + name + " : " + cause.getMessage()); + } + op.fail(cause); + } + + /** + * Handling already closed exception. + */ + private void handleAlreadyClosedException(AlreadyClosedException ace) { + unexpectedExceptions.inc(); + logger.error("Encountered unexpected exception when writing data into stream {} : ", name, ace); + fatalErrorHandler.notifyFatalError(); + } + + // + // Acquire streams + // + + Future<Boolean> acquireStream() { + final Stopwatch stopwatch = Stopwatch.createStarted(); + final Promise<Boolean> acquirePromise = new Promise<Boolean>(); + manager.openAsyncLogWriter().addEventListener( + FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() { + + @Override + public void onSuccess(AsyncLogWriter w) { + onAcquireStreamSuccess(w, stopwatch, acquirePromise); + } + + @Override + public void onFailure(Throwable cause) { + onAcquireStreamFailure(cause, stopwatch, acquirePromise); + } + + }, scheduler, getStreamName())); + return acquirePromise; + } + + private void onAcquireStreamSuccess(AsyncLogWriter w, + Stopwatch stopwatch, + Promise<Boolean> acquirePromise) { + synchronized (txnLock) { + sequencer.setLastId(w.getLastTxId()); + } + AsyncLogWriter oldWriter; + Queue<StreamOp> oldPendingOps; + boolean success; + synchronized (StreamImpl.this) { + oldWriter = setStreamStatus(StreamStatus.INITIALIZED, + StreamStatus.INITIALIZING, w, null); + oldPendingOps = pendingOps; + pendingOps = new ArrayDeque<StreamOp>(); + success = true; + } + // check if the stream is allowed to be acquired + if (!streamManager.allowAcquire(StreamImpl.this)) { + if (null != oldWriter) { + Abortables.asyncAbort(oldWriter, true); + } + int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy(); + StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream() + + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions"); + countException(sue, exceptionStatLogger); + logger.error("Failed to acquire stream {} because it is unavailable : {}", + name, sue.getMessage()); + synchronized (this) { + oldWriter = setStreamStatus(StreamStatus.ERROR, + StreamStatus.INITIALIZED, null, sue); + // we don't switch the pending ops since they are already switched + // when setting the status to initialized + success = false; + } + } + processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise); + } + + private void onAcquireStreamFailure(Throwable cause, + Stopwatch stopwatch, + Promise<Boolean> acquirePromise) { + AsyncLogWriter oldWriter; + Queue<StreamOp> oldPendingOps; + boolean success; + if (cause instanceof AlreadyClosedException) { + countException(cause, streamExceptionStatLogger); + handleAlreadyClosedException((AlreadyClosedException) cause); + return; + } else { + if (isCriticalException(cause)) { + countException(cause, streamExceptionStatLogger); + logger.error("Failed to acquire stream {} : ", name, cause); + } else { + logger.warn("Failed to acquire stream {} : {}", name, cause.getMessage()); + } + synchronized (StreamImpl.this) { + oldWriter = setStreamStatus(StreamStatus.ERROR, + StreamStatus.INITIALIZING, null, cause); + oldPendingOps = pendingOps; + pendingOps = new ArrayDeque<StreamOp>(); + success = false; + } + } + processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise); + } + + /** + * Process the pending request after acquired stream. + * + * @param success whether the acquisition succeed or not + * @param oldWriter the old writer to abort + * @param oldPendingOps the old pending ops to execute + * @param stopwatch stopwatch to measure the time spent on acquisition + * @param acquirePromise the promise to complete the acquire operation + */ + void processPendingRequestsAfterAcquire(boolean success, + AsyncLogWriter oldWriter, + Queue<StreamOp> oldPendingOps, + Stopwatch stopwatch, + Promise<Boolean> acquirePromise) { + if (success) { + streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } else { + streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } + for (StreamOp op : oldPendingOps) { + executeOp(op, success); + pendingOpsCounter.dec(); + } + Abortables.asyncAbort(oldWriter, true); + FutureUtils.setValue(acquirePromise, success); + } + + // + // Stream Status Changes + // + + synchronized void setStreamInErrorStatus() { + if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) { + return; + } + this.status = StreamStatus.ERROR; + } + + /** + * Update the stream status. The changes are only applied when there isn't status changed. + * + * @param newStatus + * new status + * @param oldStatus + * old status + * @param writer + * new log writer + * @param t + * new exception + * @return old writer if it exists + */ + synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus, + StreamStatus oldStatus, + AsyncLogWriter writer, + Throwable t) { + if (oldStatus != this.status) { + logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}", + new Object[] { name, oldStatus, this.status, newStatus }); + return null; + } + + String owner = null; + if (t instanceof OwnershipAcquireFailedException) { + owner = ((OwnershipAcquireFailedException) t).getCurrentOwner(); + } + + AsyncLogWriter oldWriter = this.writer; + this.writer = writer; + if (null != owner && owner.equals(clientId)) { + unexpectedExceptions.inc(); + logger.error("I am waiting myself {} to release lock on stream {}, so have to shut myself down :", + new Object[] { owner, name, t }); + // I lost the ownership but left a lock over zookeeper + // I should not ask client to redirect to myself again as I can't handle it :( + // shutdown myself + fatalErrorHandler.notifyFatalError(); + this.owner = null; + } else { + this.owner = owner; + } + this.lastException = t; + this.status = newStatus; + if (StreamStatus.INITIALIZED == newStatus) { + streamManager.notifyAcquired(this); + logger.info("Inserted acquired stream {} -> writer {}", name, this); + } else { + streamManager.notifyReleased(this); + logger.info("Removed acquired stream {} -> writer {}", name, this); + } + return oldWriter; + } + + // + // Stream Close Functions + // + + void close(DistributedLogManager dlm) { + if (null != dlm) { + try { + dlm.close(); + } catch (IOException ioe) { + logger.warn("Failed to close dlm for {} : ", name, ioe); + } + } + } + + @Override + public Future<Void> requestClose(String reason) { + return requestClose(reason, true); + } + + Future<Void> requestClose(String reason, boolean uncache) { + final boolean abort; + closeLock.writeLock().lock(); + try { + if (StreamStatus.CLOSING == status + || StreamStatus.CLOSED == status) { + return closePromise; + } + logger.info("Request to close stream {} : {}", getStreamName(), reason); + // if the stream isn't closed from INITIALIZED state, we abort the stream instead of closing it. + abort = StreamStatus.INITIALIZED != status; + status = StreamStatus.CLOSING; + streamManager.notifyReleased(this); + } finally { + closeLock.writeLock().unlock(); + } + // we will fail the requests that are coming in between closing and closed only + // after the async writer is closed. so we could clear up the lock before redirect + // them. + close(abort, uncache); + return closePromise; + } + + @Override + public void delete() throws IOException { + if (null != writer) { + Utils.close(writer); + synchronized (this) { + writer = null; + lastException = new StreamUnavailableException("Stream was deleted"); + } + } + if (null == manager) { + throw new UnexpectedException("No stream " + name + " to delete"); + } + manager.delete(); + } + + /** + * Post action executed after closing. + */ + private void postClose(boolean uncache) { + closeManagerAndErrorOutPendingRequests(); + unregisterGauge(); + if (uncache) { + if (null != owner) { + long probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3; + streamManager.scheduleRemoval(this, probationTimeoutMs); + } else { + streamManager.notifyRemoved(this); + logger.info("Removed cached stream {}.", getStreamName()); + } + } + FutureUtils.setValue(closePromise, null); + } + + /** + * Shouldn't call close directly. The callers should call #requestClose instead + * + * @param shouldAbort shall we abort the stream instead of closing + */ + private Future<Void> close(boolean shouldAbort, final boolean uncache) { + boolean abort; + closeLock.writeLock().lock(); + try { + if (StreamStatus.CLOSED == status) { + return closePromise; + } + abort = shouldAbort || (StreamStatus.INITIALIZED != status && StreamStatus.CLOSING != status); + status = StreamStatus.CLOSED; + streamManager.notifyReleased(this); + } finally { + closeLock.writeLock().unlock(); + } + logger.info("Closing stream {} ...", name); + // Close the writers to release the locks before failing the requests + Future<Void> closeWriterFuture; + if (abort) { + closeWriterFuture = Abortables.asyncAbort(writer, true); + } else { + closeWriterFuture = Utils.asyncClose(writer, true); + } + // close the manager and error out pending requests after close writer + Duration closeWaitDuration; + if (writerCloseTimeoutMs <= 0) { + closeWaitDuration = Duration.Top(); + } else { + closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs); + } + + FutureUtils.stats( + closeWriterFuture, + writerCloseStatLogger, + Stopwatch.createStarted() + ).masked().within(futureTimer, closeWaitDuration) + .addEventListener(FutureUtils.OrderedFutureEventListener.of( + new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + postClose(uncache); + } + @Override + public void onFailure(Throwable cause) { + if (cause instanceof TimeoutException) { + writerCloseTimeoutCounter.inc(); + } + postClose(uncache); + } + }, scheduler, name)); + return closePromise; + } + + private void closeManagerAndErrorOutPendingRequests() { + close(manager); + // Failed the pending requests. + Queue<StreamOp> oldPendingOps; + synchronized (this) { + oldPendingOps = pendingOps; + pendingOps = new ArrayDeque<StreamOp>(); + } + StreamUnavailableException closingException = + new StreamUnavailableException("Stream " + name + " is closed."); + for (StreamOp op : oldPendingOps) { + op.fail(closingException); + pendingOpsCounter.dec(); + } + limiter.close(); + logger.info("Closed stream {}.", name); + } + + /** + * clean up the gauge to help GC. + */ + private void unregisterGauge(){ + streamLogger.unregisterGauge("stream_status", this.streamStatusGauge); + } + + // Test-only apis + + @VisibleForTesting + public int numPendingOps() { + Queue<StreamOp> queue = pendingOps; + return null == queue ? 0 : queue.size(); + } + + @VisibleForTesting + public StreamStatus getStatus() { + return status; + } + + @VisibleForTesting + public void setStatus(StreamStatus status) { + this.status = status; + } + + @VisibleForTesting + public AsyncLogWriter getWriter() { + return writer; + } + + @VisibleForTesting + public DistributedLogManager getManager() { + return manager; + } + + @VisibleForTesting + public Throwable getLastException() { + return lastException; + } + + @VisibleForTesting + public Future<Void> getCloseFuture() { + return closePromise; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java new file mode 100644 index 0000000..d86c538 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import com.google.common.base.Optional; +import com.twitter.util.Future; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Manage lifecycle of streams. + * + * <p>StreamManager is responsible for creating, destroying, and keeping track of Stream objects. + * + * <p>Stream objects, which are managed by StreamManager and created by StreamFactory, are essentially the + * per stream request handlers, responsible fo dispatching ex. write requests to an underlying AsyncLogWriter, + * managing stream lock, interpreting exceptions, error conditions, and etc. + */ +public interface StreamManager { + + /** + * Get a cached stream, returning null if it doesnt exist. + * @param stream name + * @return the cached stream + */ + Stream getStream(String stream); + + /** + * Get a cached stream and create a new one if it doesnt exist. + * @param streamName stream name + * @param start whether to start the stream after it is created. + * @return future satisfied once close complete + */ + Stream getOrCreateStream(String streamName, boolean start) throws IOException; + + /** + * Asynchronously create a new stream. + * @param stream + * @return Future satisfied once the stream is created + */ + Future<Void> createStreamAsync(String stream); + + /** + * Is acquiring stream allowed? + * + * @param stream + * stream instance + * @return true if it is allowed to acquire this stream, otherwise false. + */ + boolean allowAcquire(Stream stream); + + /** + * Notify the manager that a stream was acquired. + * @param stream being acquired + */ + void notifyAcquired(Stream stream); + + /** + * Notify the manager that a stream was released. + * @param stream being released + */ + void notifyReleased(Stream stream); + + /** + * Notify the manager that a stream was completely removed. + * @param stream being uncached + * @return whether the stream existed or not + */ + boolean notifyRemoved(Stream stream); + + /** + * Asynchronous delete method. + * @param streamName stream name + * @return future satisfied once delete complete + */ + Future<Void> deleteAndRemoveAsync(String streamName); + + /** + * Asynchronous close and uncache method. + * @param streamName stream name + * @return future satisfied once close and uncache complete + */ + Future<Void> closeAndRemoveAsync(String streamName); + + /** + * Close and uncache after delayMs. + * @param stream to remove + */ + void scheduleRemoval(Stream stream, long delayMs); + + /** + * Close all stream. + * @return future satisfied all streams closed + */ + Future<List<Void>> closeStreams(); + + /** + * Return map with stream ownership info. + * @param regex for filtering streams + * @return map containing ownership info + */ + Map<String, String> getStreamOwnershipMap(Optional<String> regex); + + /** + * Number of acquired streams. + * @return number of acquired streams + */ + int numAcquired(); + + /** + * Number of cached streams. + * @return number of cached streams + */ + int numCached(); + + /** + * Is the stream denoted by streamName in the acquired state. + * @return true if the stream is in the acquired state + */ + boolean isAcquired(String streamName); + + /** + * Close manager and disallow further activity. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java new file mode 100644 index 0000000..5d54738 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java @@ -0,0 +1,413 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.RateLimiter; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.ServiceUnavailableException; +import org.apache.distributedlog.exceptions.StreamUnavailableException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.service.config.StreamConfigProvider; +import org.apache.distributedlog.service.streamset.Partition; +import org.apache.distributedlog.service.streamset.PartitionMap; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import org.apache.distributedlog.util.ConfUtils; +import com.twitter.util.Future; +import com.twitter.util.Promise; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * StreamManagerImpl is the default implementation responsible for creating, destroying, and keeping track + * of Streams. + * + * <p>StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating + * a stream object in isolation from the rest of the system. We pass a StreamFactory in instead of simply + * creating StreamImpl's ourselves in order to inject dependencies without bloating the StreamManagerImpl + * constructor. + */ +public class StreamManagerImpl implements StreamManager { + + private static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class); + + private final ConcurrentHashMap<String, Stream> streams = + new ConcurrentHashMap<String, Stream>(); + private final AtomicInteger numCached = new AtomicInteger(0); + + private final ConcurrentHashMap<String, Stream> acquiredStreams = + new ConcurrentHashMap<String, Stream>(); + private final AtomicInteger numAcquired = new AtomicInteger(0); + + // + // Partitions + // + private final StreamPartitionConverter partitionConverter; + private final PartitionMap cachedPartitions = new PartitionMap(); + private final PartitionMap acquiredPartitions = new PartitionMap(); + + final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock(); + private final ScheduledExecutorService executorService; + private final DistributedLogConfiguration dlConfig; + private final StreamConfigProvider streamConfigProvider; + private final String clientId; + private boolean closed = false; + private final StreamFactory streamFactory; + private final DistributedLogNamespace dlNamespace; + + public StreamManagerImpl(String clientId, + DistributedLogConfiguration dlConfig, + ScheduledExecutorService executorService, + StreamFactory streamFactory, + StreamPartitionConverter partitionConverter, + StreamConfigProvider streamConfigProvider, + DistributedLogNamespace dlNamespace) { + this.clientId = clientId; + this.executorService = executorService; + this.streamFactory = streamFactory; + this.partitionConverter = partitionConverter; + this.dlConfig = dlConfig; + this.streamConfigProvider = streamConfigProvider; + this.dlNamespace = dlNamespace; + } + + private DynamicDistributedLogConfiguration getDynConf(String streamName) { + Optional<DynamicDistributedLogConfiguration> dynDlConf = + streamConfigProvider.getDynamicStreamConfig(streamName); + if (dynDlConf.isPresent()) { + return dynDlConf.get(); + } else { + return ConfUtils.getConstDynConf(dlConfig); + } + } + + @Override + public boolean allowAcquire(Stream stream) { + return acquiredPartitions.addPartition( + stream.getPartition(), + stream.getStreamConfiguration().getMaxAcquiredPartitionsPerProxy()); + } + + /** + * Must be enqueued to an executor to avoid deadlocks (close and execute-op both + * try to acquire the same read-write lock). + */ + @Override + public Future<Void> deleteAndRemoveAsync(final String stream) { + final Promise<Void> result = new Promise<Void>(); + java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() { + @Override + public void run() { + result.become(doDeleteAndRemoveAsync(stream)); + } + }, 0); + if (null == scheduleFuture) { + return Future.exception( + new ServiceUnavailableException("Couldn't schedule a delete task.")); + } + return result; + } + + /** + * Must be enqueued to an executor to avoid deadlocks (close and execute-op both + * try to acquire the same read-write lock). + */ + @Override + public Future<Void> closeAndRemoveAsync(final String streamName) { + final Promise<Void> releasePromise = new Promise<Void>(); + java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() { + @Override + public void run() { + releasePromise.become(doCloseAndRemoveAsync(streamName)); + } + }, 0); + if (null == scheduleFuture) { + return Future.exception( + new ServiceUnavailableException("Couldn't schedule a release task.")); + } + return releasePromise; + } + + @Override + public Future<Void> createStreamAsync(final String stream) { + final Promise<Void> createPromise = new Promise<Void>(); + java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() { + @Override + public void run() { + try { + dlNamespace.createLog(stream); + createPromise.setValue(null); + } catch (Exception e) { + createPromise.setException(e); + } + } + }, 0); + if (null == scheduleFuture) { + return Future.exception( + new ServiceUnavailableException("Couldn't schedule a create task.")); + } + return createPromise; + } + + @Override + public void notifyReleased(Stream stream) { + acquiredPartitions.removePartition(stream.getPartition()); + if (acquiredStreams.remove(stream.getStreamName(), stream)) { + numAcquired.getAndDecrement(); + } + } + + @Override + public void notifyAcquired(Stream stream) { + if (null == acquiredStreams.put(stream.getStreamName(), stream)) { + numAcquired.getAndIncrement(); + } + } + + @Override + public boolean notifyRemoved(Stream stream) { + cachedPartitions.removePartition(stream.getPartition()); + if (streams.remove(stream.getStreamName(), stream)) { + numCached.getAndDecrement(); + return true; + } + return false; + } + + @Override + public Map<String, String> getStreamOwnershipMap(Optional<String> regex) { + Map<String, String> ownershipMap = new HashMap<String, String>(); + for (Map.Entry<String, Stream> entry : acquiredStreams.entrySet()) { + String name = entry.getKey(); + if (regex.isPresent() && !name.matches(regex.get())) { + continue; + } + Stream stream = entry.getValue(); + if (null == stream) { + continue; + } + String owner = stream.getOwner(); + if (null == owner) { + ownershipMap.put(name, clientId); + } + } + return ownershipMap; + } + + @Override + public Stream getStream(String stream) { + return streams.get(stream); + } + + @Override + public Stream getOrCreateStream(String streamName, boolean start) throws IOException { + Stream stream = streams.get(streamName); + if (null == stream) { + closeLock.readLock().lock(); + try { + if (closed) { + return null; + } + DynamicDistributedLogConfiguration dynConf = getDynConf(streamName); + int maxCachedPartitions = dynConf.getMaxCachedPartitionsPerProxy(); + + // get partition from the stream name + Partition partition = partitionConverter.convert(streamName); + + // add partition to cached map + if (!cachedPartitions.addPartition(partition, maxCachedPartitions)) { + throw new StreamUnavailableException("Stream " + streamName + + " is not allowed to cache more than " + maxCachedPartitions + " partitions"); + } + + stream = newStream(streamName, dynConf); + Stream oldWriter = streams.putIfAbsent(streamName, stream); + if (null != oldWriter) { + stream = oldWriter; + } else { + numCached.getAndIncrement(); + logger.info("Inserted mapping stream name {} -> stream {}", streamName, stream); + stream.initialize(); + if (start) { + stream.start(); + } + } + } finally { + closeLock.readLock().unlock(); + } + } + return stream; + } + + @Override + public Future<List<Void>> closeStreams() { + int numAcquired = acquiredStreams.size(); + int numCached = streams.size(); + logger.info("Closing all acquired streams : acquired = {}, cached = {}.", + numAcquired, numCached); + Set<Stream> streamsToClose = new HashSet<Stream>(); + streamsToClose.addAll(streams.values()); + return closeStreams(streamsToClose, Optional.<RateLimiter>absent()); + } + + @Override + public void scheduleRemoval(final Stream stream, long delayMs) { + if (delayMs > 0) { + logger.info("Scheduling removal of stream {} from cache after {} sec.", + stream.getStreamName(), delayMs); + } + schedule(new Runnable() { + @Override + public void run() { + if (notifyRemoved(stream)) { + logger.info("Removed cached stream {} after probation.", stream.getStreamName()); + } else { + logger.info("Cached stream {} already removed.", stream.getStreamName()); + } + } + }, delayMs); + } + + @Override + public int numAcquired() { + return numAcquired.get(); + } + + @Override + public int numCached() { + return numCached.get(); + } + + @Override + public boolean isAcquired(String streamName) { + return acquiredStreams.containsKey(streamName); + } + + @Override + public void close() { + closeLock.writeLock().lock(); + try { + if (closed) { + return; + } + closed = true; + } finally { + closeLock.writeLock().unlock(); + } + } + + private Future<List<Void>> closeStreams(Set<Stream> streamsToClose, Optional<RateLimiter> rateLimiter) { + if (streamsToClose.isEmpty()) { + logger.info("No streams to close."); + List<Void> emptyList = new ArrayList<Void>(); + return Future.value(emptyList); + } + List<Future<Void>> futures = new ArrayList<Future<Void>>(streamsToClose.size()); + for (Stream stream : streamsToClose) { + if (rateLimiter.isPresent()) { + rateLimiter.get().acquire(); + } + futures.add(stream.requestClose("Close Streams")); + } + return Future.collect(futures); + } + + private Stream newStream(String name, DynamicDistributedLogConfiguration streamConf) { + return streamFactory.create(name, streamConf, this); + } + + public Future<Void> doCloseAndRemoveAsync(final String streamName) { + Stream stream = streams.get(streamName); + if (null == stream) { + logger.info("No stream {} to release.", streamName); + return Future.value(null); + } else { + return stream.requestClose("release ownership"); + } + } + + /** + * Dont schedule if we're closed - closeLock is acquired to close, so if we acquire the + * lock and discover we're not closed, we won't schedule. + */ + private java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) { + closeLock.readLock().lock(); + try { + if (closed) { + return null; + } else if (delayMs > 0) { + return executorService.schedule(runnable, delayMs, TimeUnit.MILLISECONDS); + } else { + return executorService.submit(runnable); + } + } catch (RejectedExecutionException ree) { + logger.error("Failed to schedule task {} in {} ms : ", + new Object[] { runnable, delayMs, ree }); + return null; + } finally { + closeLock.readLock().unlock(); + } + } + + private Future<Void> doDeleteAndRemoveAsync(final String streamName) { + Stream stream = streams.get(streamName); + if (null == stream) { + logger.warn("No stream {} to delete.", streamName); + return Future.exception(new UnexpectedException("No stream " + streamName + " to delete.")); + } else { + Future<Void> result; + logger.info("Deleting stream {}, {}", streamName, stream); + try { + stream.delete(); + result = stream.requestClose("Stream Deleted"); + } catch (IOException e) { + logger.error("Failed on removing stream {} : ", streamName, e); + result = Future.exception(e); + } + return result; + } + } + + @VisibleForTesting + public ConcurrentHashMap<String, Stream> getCachedStreams() { + return streams; + } + + @VisibleForTesting + public ConcurrentHashMap<String, Stream> getAcquiredStreams() { + return acquiredStreams; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java new file mode 100644 index 0000000..d0b8de4 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import com.google.common.base.Stopwatch; +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.thrift.service.ResponseHeader; +import org.apache.distributedlog.util.Sequencer; +import com.twitter.util.Future; + +/** + * An operation applied to a stream. + */ +public interface StreamOp { + /** + * Execute a stream op with the supplied writer. + * + * @param writer active writer for applying the change + * @param sequencer sequencer used for generating transaction id for stream operations + * @param txnLock transaction lock to guarantee ordering of transaction id + * @return a future satisfied when the operation completes execution + */ + Future<Void> execute(AsyncLogWriter writer, + Sequencer sequencer, + Object txnLock); + + /** + * Invoked before the stream op is executed. + */ + void preExecute() throws DLException; + + /** + * Return the response header (containing the status code etc.). + * + * @return A future containing the response header or the exception + * encountered by the op if it failed. + */ + Future<ResponseHeader> responseHeader(); + + /** + * Abort the operation with the givem exception. + */ + void fail(Throwable t); + + /** + * Return the stream name. + */ + String streamName(); + + /** + * Stopwatch gives the start time of the operation. + */ + Stopwatch stopwatch(); + + /** + * Compute checksum from arguments. + */ + Long computeChecksum(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java new file mode 100644 index 0000000..f3fc610 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import org.apache.distributedlog.service.streamset.Partition; +import org.apache.distributedlog.stats.BroadCastStatsLogger; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * Encapsulate stream op stats construction to make it easier to access stream + * op stats consistently from different scopes. + */ +public class StreamOpStats { + private final StatsLogger baseStatsLogger; + private final StatsLogger requestStatsLogger; + private final StatsLogger recordsStatsLogger; + private final StatsLogger requestDeniedStatsLogger; + private final StatsLogger streamStatsLogger; + + public StreamOpStats(StatsLogger statsLogger, + StatsLogger perStreamStatsLogger) { + this.baseStatsLogger = statsLogger; + this.requestStatsLogger = statsLogger.scope("request"); + this.recordsStatsLogger = statsLogger.scope("records"); + this.requestDeniedStatsLogger = statsLogger.scope("denied"); + this.streamStatsLogger = perStreamStatsLogger; + } + + public StatsLogger baseStatsLogger(String opName) { + return baseStatsLogger; + } + + public Counter baseCounter(String opName) { + return baseStatsLogger.getCounter(opName); + } + + public StatsLogger baseScope(String opName) { + return baseStatsLogger.scope(opName); + } + + public OpStatsLogger requestLatencyStat(String opName) { + return requestStatsLogger.getOpStatsLogger(opName); + } + + public StatsLogger requestScope(String scopeName) { + return requestStatsLogger.scope(scopeName); + } + + public Counter scopedRequestCounter(String opName, String counterName) { + return requestScope(opName).getCounter(counterName); + } + + public Counter requestCounter(String counterName) { + return requestStatsLogger.getCounter(counterName); + } + + public Counter requestPendingCounter(String counterName) { + return requestCounter(counterName); + } + + public Counter requestDeniedCounter(String counterName) { + return requestDeniedStatsLogger.getCounter(counterName); + } + + public Counter recordsCounter(String counterName) { + return recordsStatsLogger.getCounter(counterName); + } + + public StatsLogger streamRequestStatsLogger(Partition partition) { + return BroadCastStatsLogger.masterslave( + streamStatsLogger.scope(partition.getStream()).scope("partition") + .scope(partition.getPaddedId()), streamStatsLogger.scope(partition.getStream()) + .scope("aggregate")); + } + + public StatsLogger streamRequestScope(Partition partition, String scopeName) { + return streamRequestStatsLogger(partition).scope(scopeName); + } + + public OpStatsLogger streamRequestLatencyStat(Partition partition, String opName) { + return streamRequestStatsLogger(partition).getOpStatsLogger(opName); + } + + public Counter streamRequestCounter(Partition partition, String opName, String counterName) { + return streamRequestScope(partition, opName).getCounter(counterName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java new file mode 100644 index 0000000..0036a5c --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.RequestDeniedException; +import org.apache.distributedlog.protocol.util.ProtocolUtils; +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.thrift.service.WriteResponse; +import org.apache.distributedlog.util.Sequencer; +import com.twitter.util.Future; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; + +/** + * Operation to truncate a log stream. + */ +public class TruncateOp extends AbstractWriteOp { + + private static final Logger logger = LoggerFactory.getLogger(TruncateOp.class); + + private final Counter deniedTruncateCounter; + private final DLSN dlsn; + private final AccessControlManager accessControlManager; + + public TruncateOp(String stream, + DLSN dlsn, + StatsLogger statsLogger, + StatsLogger perStreamStatsLogger, + Long checksum, + Feature checksumDisabledFeature, + AccessControlManager accessControlManager) { + super(stream, requestStat(statsLogger, "truncate"), checksum, checksumDisabledFeature); + StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); + this.deniedTruncateCounter = streamOpStats.requestDeniedCounter("truncate"); + this.accessControlManager = accessControlManager; + this.dlsn = dlsn; + } + + @Override + public Long computeChecksum() { + return ProtocolUtils.truncateOpCRC32(stream, dlsn); + } + + @Override + protected Future<WriteResponse> executeOp(AsyncLogWriter writer, + Sequencer sequencer, + Object txnLock) { + if (!stream.equals(writer.getStreamName())) { + logger.error("Truncate: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName()); + return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request")); + } + return writer.truncate(dlsn).map(new AbstractFunction1<Boolean, WriteResponse>() { + @Override + public WriteResponse apply(Boolean v1) { + return ResponseUtils.writeSuccess(); + } + }); + } + + @Override + public void preExecute() throws DLException { + if (!accessControlManager.allowTruncate(stream)) { + deniedTruncateCounter.inc(); + throw new RequestDeniedException(stream, "truncate"); + } + super.preExecute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java new file mode 100644 index 0000000..2e7ffb8 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.RequestDeniedException; +import org.apache.distributedlog.protocol.util.ProtocolUtils; +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.service.config.ServerConfiguration; +import org.apache.distributedlog.service.streamset.Partition; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import org.apache.distributedlog.thrift.service.ResponseHeader; +import org.apache.distributedlog.thrift.service.StatusCode; +import org.apache.distributedlog.thrift.service.WriteResponse; +import org.apache.distributedlog.util.Sequencer; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; + +/** + * Operation to write a single record to a log stream. + */ +public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload { + + private static final Logger logger = LoggerFactory.getLogger(WriteOp.class); + + private final byte[] payload; + private final boolean isRecordSet; + + // Stats + private final Counter deniedWriteCounter; + private final Counter successRecordCounter; + private final Counter failureRecordCounter; + private final Counter redirectRecordCounter; + private final OpStatsLogger latencyStat; + private final Counter bytes; + private final Counter writeBytes; + + private final byte dlsnVersion; + private final AccessControlManager accessControlManager; + + public WriteOp(String stream, + ByteBuffer data, + StatsLogger statsLogger, + StatsLogger perStreamStatsLogger, + StreamPartitionConverter streamPartitionConverter, + ServerConfiguration conf, + byte dlsnVersion, + Long checksum, + boolean isRecordSet, + Feature checksumDisabledFeature, + AccessControlManager accessControlManager) { + super(stream, requestStat(statsLogger, "write"), checksum, checksumDisabledFeature); + payload = new byte[data.remaining()]; + data.get(payload); + this.isRecordSet = isRecordSet; + + final Partition partition = streamPartitionConverter.convert(stream); + StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); + this.successRecordCounter = streamOpStats.recordsCounter("success"); + this.failureRecordCounter = streamOpStats.recordsCounter("failure"); + this.redirectRecordCounter = streamOpStats.recordsCounter("redirect"); + this.deniedWriteCounter = streamOpStats.requestDeniedCounter("write"); + this.writeBytes = streamOpStats.scopedRequestCounter("write", "bytes"); + this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "write"); + this.bytes = streamOpStats.streamRequestCounter(partition, "write", "bytes"); + + this.dlsnVersion = dlsnVersion; + this.accessControlManager = accessControlManager; + + final long size = getPayloadSize(); + result().addEventListener(new FutureEventListener<WriteResponse>() { + @Override + public void onSuccess(WriteResponse response) { + if (response.getHeader().getCode() == StatusCode.SUCCESS) { + latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + bytes.add(size); + writeBytes.add(size); + } else { + latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + } + } + @Override + public void onFailure(Throwable cause) { + latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + } + }); + } + + @Override + public long getPayloadSize() { + return payload.length; + } + + @Override + public Long computeChecksum() { + return ProtocolUtils.writeOpCRC32(stream, payload); + } + + @Override + public void preExecute() throws DLException { + if (!accessControlManager.allowWrite(stream)) { + deniedWriteCounter.inc(); + throw new RequestDeniedException(stream, "write"); + } + super.preExecute(); + } + + @Override + protected Future<WriteResponse> executeOp(AsyncLogWriter writer, + Sequencer sequencer, + Object txnLock) { + if (!stream.equals(writer.getStreamName())) { + logger.error("Write: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName()); + return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request")); + } + + long txnId; + Future<DLSN> writeResult; + synchronized (txnLock) { + txnId = sequencer.nextId(); + LogRecord record = new LogRecord(txnId, payload); + if (isRecordSet) { + record.setRecordSet(); + } + writeResult = writer.write(record); + } + return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() { + @Override + public WriteResponse apply(DLSN value) { + successRecordCounter.inc(); + return ResponseUtils.writeSuccess().setDlsn(value.serialize(dlsnVersion)); + } + }); + } + + @Override + protected void fail(ResponseHeader header) { + if (StatusCode.FOUND == header.getCode()) { + redirectRecordCounter.inc(); + } else { + failureRecordCounter.inc(); + } + super.fail(header); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java new file mode 100644 index 0000000..e411b420 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +/** + * A write operation with payload. + */ +public interface WriteOpWithPayload { + + // Return the payload size in bytes + long getPayloadSize(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java new file mode 100644 index 0000000..fcaee35 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream.admin; + +import org.apache.distributedlog.exceptions.DLException; +import com.twitter.util.Future; + +/** + * Admin operation interface. + */ +public interface AdminOp<RespT> { + + /** + * Invoked before the stream op is executed. + */ + void preExecute() throws DLException; + + /** + * Execute the operation. + * + * @return the future represents the response of the operation + */ + Future<RespT> execute(); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java new file mode 100644 index 0000000..89a2566 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream.admin; + +import static org.apache.distributedlog.service.stream.AbstractStreamOp.requestStat; + +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.service.stream.StreamManager; +import org.apache.distributedlog.thrift.service.WriteResponse; +import com.twitter.util.Future; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.StatsLogger; +import scala.runtime.AbstractFunction1; + +/** + * Operation to create log stream. + */ +public class CreateOp extends StreamAdminOp { + + public CreateOp(String stream, + StatsLogger statsLogger, + StreamManager streamManager, + Long checksum, + Feature checksumEnabledFeature) { + super(stream, + streamManager, + requestStat(statsLogger, "create"), + checksum, + checksumEnabledFeature); + } + + @Override + protected Future<WriteResponse> executeOp() { + Future<Void> result = streamManager.createStreamAsync(stream); + return result.map(new AbstractFunction1<Void, WriteResponse>() { + @Override + public WriteResponse apply(Void value) { + return ResponseUtils.writeSuccess(); + } + }); + } +}