DL-97: Remove unused methods in BKLogHandler merge code change on remove unused methods in BKLogHandler.
Author: Sijie Guo <si...@apache.org> Author: Sijie Guo <sij...@twitter.com> Author: Leigh Stewart <lstew...@twitter.com> Author: Jordan Bull <jb...@twitter.com> Author: Dave Rusek <dave.ru...@gmail.com> Author: Dave Rusek <dru...@twitter.com> Reviewers: Leigh Stewart <lstew...@apache.org> Closes #69 from sijie/merge/DL-97 Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/74a33029 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/74a33029 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/74a33029 Branch: refs/heads/master Commit: 74a33029cc79f3a770ad6c016a42ff4ff7ad71c2 Parents: 5b55bee Author: Sijie Guo <si...@apache.org> Authored: Wed Dec 21 00:09:57 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Wed Dec 21 00:09:57 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKAsyncLogReaderDLSN.java | 15 +- .../BKDistributedLogNamespace.java | 20 +- .../twitter/distributedlog/BKLogHandler.java | 140 +---- .../distributedlog/BKLogSegmentWriter.java | 12 +- .../distributedlog/BookKeeperClient.java | 48 +- .../DistributedLogConfiguration.java | 8 +- .../impl/ZKLogSegmentMetadataStore.java | 94 +++- .../stats/BroadCastStatsLogger.java | 22 + .../impl/TestZKLogSegmentMetadataStore.java | 22 +- .../service/DistributedLogServiceImpl.java | 27 +- .../service/stream/StreamImpl.java | 550 +++++++------------ .../service/stream/StreamManager.java | 5 +- .../service/stream/StreamManagerImpl.java | 15 +- .../service/TestDistributedLogService.java | 20 +- pom.xml | 2 +- 15 files changed, 370 insertions(+), 630 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java index 7d3d53d..b1a9273 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java @@ -49,7 +49,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Function1; @@ -73,7 +72,7 @@ import scala.runtime.AbstractFunction1; * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors. * </ul> */ -class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNotifier, AsyncLogReader, Runnable, AsyncNotification { +class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotification { static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReaderDLSN.class); private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION = @@ -86,7 +85,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti protected final BKDistributedLogManager bkDistributedLogManager; protected final BKLogReadHandler bkLedgerManager; - private Watcher sessionExpireWatcher = null; private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); private final ScheduledExecutorService executorService; private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>(); @@ -218,7 +216,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti this.executorService = executorService; this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId, lockStateExecutor, this, deserializeRecordSet, true); - sessionExpireWatcher = this.bkLedgerManager.registerExpirationHandler(this); LOG.debug("Starting async reader at {}", startDLSN); this.startDLSN = startDLSN; this.scheduleDelayStopwatch = Stopwatch.createUnstarted(); @@ -255,14 +252,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary(); } - @Override - public void notifySessionExpired() { - // ZK Session notification is an indication to check if this has resulted in a fatal error - // of the underlying reader, in itself this reader doesnt error out unless the underlying - // reader has hit an error - scheduleBackgroundRead(); - } - private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { if (idleErrorThresholdMillis < Integer.MAX_VALUE) { // Dont run the task more than once every seconds (for sanity) @@ -494,8 +483,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti cancelAllPendingReads(exception); - bkLedgerManager.unregister(sessionExpireWatcher); - FutureUtils.ignore(bkLedgerManager.asyncClose()).proxyTo(closePromise); return closePromise; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java index 0b522d0..2df1046 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java @@ -75,6 +75,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.InetAddress; import java.net.URI; import java.util.Collection; import java.util.HashMap; @@ -252,6 +253,14 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { } } + private static String getHostIpLockClientId() { + try { + return InetAddress.getLocalHost().toString(); + } catch(Exception ex) { + return DistributedLogConstants.UNKNOWN_CLIENT_ID; + } + } + private final String clientId; private final int regionId; private final DistributedLogConfiguration conf; @@ -326,9 +335,13 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { this.featureProvider = featureProvider; this.statsLogger = statsLogger; this.perLogStatsLogger = perLogStatsLogger; - this.clientId = clientId; this.regionId = regionId; this.bkdlConfig = bkdlConfig; + if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) { + this.clientId = getHostIpLockClientId(); + } else { + this.clientId = clientId; + } // Build resources StatsLogger schedulerStatsLogger = statsLogger.scope("factory").scope("thread_pool"); @@ -622,13 +635,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { DistributedLogConfiguration conf, String zkServers, StatsLogger statsLogger) { - RetryPolicy retryPolicy = null; - if (conf.getZKNumRetries() > 0) { - retryPolicy = new BoundExponentialBackoffRetryPolicy( + RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries()); - } ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder() .name(zkcName) .sessionTimeoutMs(conf.getBKClientZKSessionTimeoutMilliSeconds()) http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java index a6ec318..460de11 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java @@ -32,9 +32,7 @@ import com.twitter.distributedlog.io.AsyncCloseable; import com.twitter.distributedlog.logsegment.LogSegmentCache; import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; -import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.Utils; import com.twitter.util.Function; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; @@ -56,7 +54,6 @@ import scala.runtime.AbstractFunction0; import scala.runtime.BoxedUnit; import java.io.IOException; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -104,8 +101,6 @@ import java.util.concurrent.atomic.AtomicReference; public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbortable { static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class); - private static final int LAYOUT_VERSION = -1; - protected final ZKLogMetadata logMetadata; protected final DistributedLogConfiguration conf; protected final ZooKeeperClient zooKeeperClient; @@ -274,12 +269,7 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath()); this.bookKeeperClient = bkcBuilder.build(); this.metadataStore = metadataStore; - - if (lockClientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) { - this.lockClientId = getHostIpLockClientId(); - } else { - this.lockClientId = lockClientId; - } + this.lockClientId = lockClientId; this.getChildrenWatcher = this.zooKeeperClient.getWatcherManager() .registerChildWatcher(logMetadata.getLogSegmentsPath(), this); @@ -316,14 +306,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor return lockClientId; } - private String getHostIpLockClientId() { - try { - return InetAddress.getLocalHost().toString(); - } catch(Exception ex) { - return DistributedLogConstants.UNKNOWN_CLIENT_ID; - } - } - protected void registerListener(LogSegmentListener listener) { listeners.add(listener); } @@ -472,57 +454,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor } } - public LogRecordWithDLSN getLastLogRecord(boolean recover, boolean includeEndOfStream) throws IOException { - checkLogStreamExists(); - List<LogSegmentMetadata> ledgerList = getFullLedgerListDesc(true, true); - - for (LogSegmentMetadata metadata: ledgerList) { - LogRecordWithDLSN record = recoverLastRecordInLedger(metadata, recover, false, includeEndOfStream); - - if (null != record) { - assert(!record.isControl()); - LOG.debug("{} getLastLogRecord Returned {}", getFullyQualifiedName(), record); - return record; - } - } - - throw new LogEmptyException("Log " + getFullyQualifiedName() + " has no records"); - } - - public long getLastTxId(boolean recover, - boolean includeEndOfStream) throws IOException { - checkLogStreamExists(); - return getLastLogRecord(recover, includeEndOfStream).getTransactionId(); - } - - public DLSN getLastDLSN(boolean recover, - boolean includeEndOfStream) throws IOException { - checkLogStreamExists(); - return getLastLogRecord(recover, includeEndOfStream).getDlsn(); - } - - public long getLogRecordCount() throws IOException { - try { - checkLogStreamExists(); - } catch (LogNotFoundException exc) { - return 0; - } - - List<LogSegmentMetadata> ledgerList = getFullLedgerList(true, false); - long count = 0; - for (LogSegmentMetadata l : ledgerList) { - if (l.isInProgress()) { - LogRecord record = recoverLastRecordInLedger(l, false, false, false); - if (null != record) { - count += record.getLastPositionWithinLogSegment(); - } - } else { - count += l.getRecordCount(); - } - } - return count; - } - private Future<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) { final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build(); @@ -634,15 +565,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor return sum; } - public long getFirstTxId() throws IOException { - checkLogStreamExists(); - List<LogSegmentMetadata> ledgerList = getFullLedgerList(true, true); - - // The ledger list should at least have one element - // First TxId is populated even for in progress ledgers - return ledgerList.get(0).getFirstTxId(); - } - Future<Void> checkLogStreamExistsAsync() { final Promise<Void> promise = new Promise<Void>(); try { @@ -685,54 +607,11 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor return promise; } - private void checkLogStreamExists() throws IOException { - try { - if (null == Utils.sync(zooKeeperClient, logMetadata.getLogSegmentsPath()) - .exists(logMetadata.getLogSegmentsPath(), false)) { - throw new LogNotFoundException("Log " + getFullyQualifiedName() + " doesn't exist"); - } - } catch (InterruptedException ie) { - LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie); - throw new DLInterruptedException("Interrupted while checking " - + logMetadata.getLogSegmentsPath(), ie); - } catch (KeeperException ke) { - LOG.error("Error checking existence for {} : ", logMetadata.getLogSegmentsPath(), ke); - throw new ZKException("Error checking existence for " + getFullyQualifiedName() + " : ", ke); - } - } - @Override public Future<Void> asyncAbort() { return asyncClose(); } - /** - * Find the id of the last edit log transaction written to a edit log - * ledger. - */ - protected Pair<Long, DLSN> readLastTxIdInLedger(LogSegmentMetadata l) throws IOException { - LogRecordWithDLSN record = recoverLastRecordInLedger(l, false, false, true); - - if (null == record) { - return Pair.of(DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID, DLSN.InvalidDLSN); - } - else { - return Pair.of(record.getTransactionId(), record.getDlsn()); - } - } - - /** - * Find the id of the last edit log transaction written to a edit log - * ledger. - */ - protected LogRecordWithDLSN recoverLastRecordInLedger(LogSegmentMetadata l, - boolean fence, - boolean includeControl, - boolean includeEndOfStream) - throws IOException { - return FutureUtils.result(asyncReadLastRecord(l, fence, includeControl, includeEndOfStream)); - } - public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final LogSegmentMetadata l) { return asyncReadLastRecord(l, false, false, false); } @@ -1293,21 +1172,4 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor } } - // ZooKeeper Watchers - - Watcher registerExpirationHandler(final ZooKeeperClient.ZooKeeperSessionExpireNotifier onExpired) { - if (conf.getZKNumRetries() > 0) { - return new Watcher() { - @Override - public void process(WatchedEvent event) { - // nop - } - }; - } - return zooKeeperClient.registerExpirationHandler(onExpired); - } - - boolean unregister(Watcher watcher) { - return zooKeeperClient.unregister(watcher); - } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java index 1b52951..8276125 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java @@ -156,8 +156,10 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz // stats private final StatsLogger envelopeStatsLogger; + private final StatsLogger transmitOutstandingLogger; private final Counter transmitDataSuccesses; private final Counter transmitDataMisses; + private final Gauge<Number> transmitOutstandingGauge; private final OpStatsLogger transmitDataPacketSize; private final Counter transmitControlSuccesses; private final Counter pFlushSuccesses; @@ -255,8 +257,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz pendingWrites = segWriterStatsLogger.getCounter("pending"); // outstanding transmit requests - StatsLogger transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding"); - transmitOutstandingLogger.registerGauge("requests", new Gauge<Number>() { + transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding"); + transmitOutstandingGauge = new Gauge<Number>() { @Override public Number getDefaultValue() { return 0; @@ -265,7 +267,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz public Number getSample() { return outstandingTransmits.get(); } - }); + }; + transmitOutstandingLogger.registerGauge("requests", transmitOutstandingGauge); outstandingTransmits = new AtomicInteger(0); this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName; @@ -531,6 +534,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private void closeInternal(final boolean abort, final AtomicReference<Throwable> throwExc, final Promise<Void> closePromise) { + // remove stats + this.transmitOutstandingLogger.unregisterGauge("requests", transmitOutstandingGauge); + // Cancel the periodic keep alive schedule first if (null != periodicKeepAliveSchedule) { if (!periodicKeepAliveSchedule.cancel(false)) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java index fd22b8f..c39ae4c 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java @@ -17,6 +17,7 @@ */ package com.twitter.distributedlog; +import com.google.common.base.Optional; import com.twitter.distributedlog.ZooKeeperClient.Credentials; import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials; import com.twitter.distributedlog.exceptions.AlreadyClosedException; @@ -41,16 +42,12 @@ import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.RetryPolicy; import org.apache.commons.configuration.ConfigurationException; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.google.common.base.Optional; import static com.google.common.base.Charsets.UTF_8; @@ -62,7 +59,7 @@ import static com.google.common.base.Charsets.UTF_8; * <li> bookkeeper operation stats are exposed under current scope by {@link BookKeeper} * </ul> */ -public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireNotifier { +public class BookKeeperClient { static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class); // Parameters to build bookkeeper client @@ -83,14 +80,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN // feature provider private final Optional<FeatureProvider> featureProvider; - private Watcher sessionExpireWatcher = null; - private AtomicBoolean zkSessionExpired = new AtomicBoolean(false); - @SuppressWarnings("deprecation") private synchronized void commonInitialization( DistributedLogConfiguration conf, String ledgersPath, - ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer, - boolean registerExpirationHandler) + ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer) throws IOException, InterruptedException, KeeperException { ClientConfiguration bkConfig = new ClientConfiguration(); bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout()); @@ -124,10 +117,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN .requestTimer(requestTimer) .featureProvider(featureProvider.orNull()) .build(); - - if (registerExpirationHandler) { - sessionExpireWatcher = this.zkc.registerExpirationHandler(this); - } } BookKeeperClient(DistributedLogConfiguration conf, @@ -159,16 +148,11 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN if (null != this.bkc) { return; } - boolean registerExpirationHandler; if (null == this.zkc) { int zkSessionTimeout = conf.getBKClientZKSessionTimeoutMilliSeconds(); - RetryPolicy retryPolicy = null; - if (conf.getBKClientZKNumRetries() > 0) { - retryPolicy = new BoundExponentialBackoffRetryPolicy( + RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries()); - } - Credentials credentials = Credentials.NONE; if (conf.getZkAclId() != null) { credentials = new DigestCredentials(conf.getZkAclId(), conf.getZkAclId()); @@ -178,10 +162,9 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN retryPolicy, statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(), conf.getBKClientZKRequestRateLimit(), credentials); } - registerExpirationHandler = conf.getBKClientZKNumRetries() <= 0; try { - commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer, registerExpirationHandler); + commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer); } catch (InterruptedException e) { throw new DLInterruptedException("Interrupted on creating bookkeeper client " + name + " : ", e); } catch (KeeperException e) { @@ -190,18 +173,18 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN if (ownZK) { LOG.info("BookKeeper Client created {} with its own ZK Client : ledgersPath = {}, numRetries = {}, " + - "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}, registerExpirationHandler = {}", + "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}", new Object[] { name, ledgersPath, conf.getBKClientZKNumRetries(), conf.getBKClientZKSessionTimeoutMilliSeconds(), conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(), - conf.getBkDNSResolverOverrides(), registerExpirationHandler }); + conf.getBkDNSResolverOverrides() }); } else { LOG.info("BookKeeper Client created {} with shared zookeeper client : ledgersPath = {}, numRetries = {}, " + - "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}, registerExpirationHandler = {}", + "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}", new Object[] { name, ledgersPath, conf.getZKNumRetries(), conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(), conf.getZKRetryBackoffMaxMillis(), - conf.getBkDNSResolverOverrides(), registerExpirationHandler }); + conf.getBkDNSResolverOverrides() }); } } @@ -284,9 +267,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN } } if (null != zkc) { - if (null != sessionExpireWatcher) { - zkc.unregister(sessionExpireWatcher); - } if (ownZK) { zkc.close(); } @@ -294,20 +274,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN closed = true; } - @Override - public void notifySessionExpired() { - zkSessionExpired.set(true); - } - public synchronized void checkClosedOrInError() throws AlreadyClosedException { if (closed) { LOG.error("BookKeeper Client {} is already closed", name); throw new AlreadyClosedException("BookKeeper Client " + name + " is already closed"); } - - if (zkSessionExpired.get()) { - LOG.error("BookKeeper Client {}'s Zookeeper session has expired", name); - throw new AlreadyClosedException("BookKeeper Client " + name + "'s Zookeeper session has expired"); - } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java index c2057df..5d0e59a 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java @@ -809,12 +809,16 @@ public class DistributedLogConfiguration extends CompositeConfiguration { * Get num of retries for zookeeper client that used by bookkeeper client. * <p>Retries only happen on retryable failures like session expired, * session moved. for permanent failures, the request will fail immediately. - * The default value is 3. + * The default value is 3. Setting it to zero or negative will retry infinitely. * * @return num of retries of zookeeper client used by bookkeeper client. */ public int getBKClientZKNumRetries() { - return this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT); + int zkNumRetries = this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT); + if (zkNumRetries <= 0) { + return Integer.MAX_VALUE; + } + return zkNumRetries; } /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java index c0796a1..cb53b23 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java @@ -17,6 +17,7 @@ */ package com.twitter.distributedlog.impl; +import com.google.common.collect.ImmutableList; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.ZooKeeperClient; @@ -45,10 +46,14 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -64,7 +69,9 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch private static final Logger logger = LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class); - private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<List<String>> { + private static final List<String> EMPTY_LIST = ImmutableList.of(); + + private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<Versioned<List<String>>> { private final String logSegmentsPath; private final ZKLogSegmentMetadataStore store; @@ -78,15 +85,16 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch } @Override - public void onSuccess(final List<String> segments) { + public void onSuccess(final Versioned<List<String>> segments) { // reset the back off after a successful operation currentZKBackOffMs = store.minZKBackoffMs; - final Set<LogSegmentNamesListener> listenerSet = store.listeners.get(logSegmentsPath); + final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet = + store.listeners.get(logSegmentsPath); if (null != listenerSet) { store.submitTask(logSegmentsPath, new Runnable() { @Override public void run() { - for (LogSegmentNamesListener listener : listenerSet) { + for (VersionedLogSegmentNamesListener listener : listenerSet.values()) { listener.onSegmentsUpdated(segments); } } @@ -120,6 +128,48 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch } } + /** + * A log segment names listener that keeps tracking the version of list of log segments that it has been notified. + * It only notify the newer log segments. + */ + static class VersionedLogSegmentNamesListener { + + private final LogSegmentNamesListener listener; + private Versioned<List<String>> lastNotifiedLogSegments; + + VersionedLogSegmentNamesListener(LogSegmentNamesListener listener) { + this.listener = listener; + this.lastNotifiedLogSegments = new Versioned<List<String>>(EMPTY_LIST, Version.NEW); + } + + synchronized void onSegmentsUpdated(Versioned<List<String>> logSegments) { + if (lastNotifiedLogSegments.getVersion() == Version.NEW || + lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) { + lastNotifiedLogSegments = logSegments; + listener.onSegmentsUpdated(logSegments.getValue()); + } + } + + @Override + public int hashCode() { + return listener.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof VersionedLogSegmentNamesListener)) { + return false; + } + VersionedLogSegmentNamesListener other = (VersionedLogSegmentNamesListener) obj; + return listener.equals(other.listener); + } + + @Override + public String toString() { + return listener.toString(); + } + } + final DistributedLogConfiguration conf; // settings final int minZKBackoffMs; @@ -128,7 +178,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch final ZooKeeperClient zkc; // log segment listeners - final ConcurrentMap<String, Set<LogSegmentNamesListener>> listeners; + final ConcurrentMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>> listeners; // scheduler final OrderedScheduler scheduler; final ReentrantReadWriteLock closeLock; @@ -139,7 +189,8 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch OrderedScheduler scheduler) { this.conf = conf; this.zkc = zkc; - this.listeners = new ConcurrentHashMap<String, Set<LogSegmentNamesListener>>(); + this.listeners = + new ConcurrentHashMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>>(); this.scheduler = scheduler; this.closeLock = new ReentrantReadWriteLock(); // settings @@ -275,11 +326,16 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch @Override public Future<List<String>> getLogSegmentNames(String logSegmentsPath) { - return getLogSegmentNames(logSegmentsPath, null); + return getLogSegmentNames(logSegmentsPath, null).map(new AbstractFunction1<Versioned<List<String>>, List<String>>() { + @Override + public List<String> apply(Versioned<List<String>> list) { + return list.getValue(); + } + }); } - Future<List<String>> getLogSegmentNames(String logSegmentsPath, Watcher watcher) { - Promise<List<String>> result = new Promise<List<String>>(); + Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, Watcher watcher) { + Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>(); try { zkc.get().getChildren(logSegmentsPath, watcher, this, result); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { @@ -293,9 +349,11 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch @Override @SuppressWarnings("unchecked") public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - Promise<List<String>> result = ((Promise<List<String>>) ctx); + Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>) ctx); if (KeeperException.Code.OK.intValue() == rc) { - result.setValue(children); + /** cversion: the number of changes to the children of this znode **/ + ZkVersion zkVersion = new ZkVersion(stat.getCversion()); + result.setValue(new Versioned(children, zkVersion)); } else { result.setException(KeeperException.create(KeeperException.Code.get(rc))); } @@ -312,10 +370,13 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch if (closed) { return; } - Set<LogSegmentNamesListener> listenerSet = listeners.get(logSegmentsPath); + Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet = + listeners.get(logSegmentsPath); if (null == listenerSet) { - Set<LogSegmentNamesListener> newListenerSet = new HashSet<LogSegmentNamesListener>(); - Set<LogSegmentNamesListener> oldListenerSet = listeners.putIfAbsent(logSegmentsPath, newListenerSet); + Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet = + new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>(); + Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet = + listeners.putIfAbsent(logSegmentsPath, newListenerSet); if (null != oldListenerSet) { listenerSet = oldListenerSet; } else { @@ -323,7 +384,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch } } synchronized (listenerSet) { - listenerSet.add(listener); + listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener)); if (!listeners.containsKey(logSegmentsPath)) { // listener set has been removed, add it back listeners.put(logSegmentsPath, listenerSet); @@ -343,7 +404,8 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch if (closed) { return; } - Set<LogSegmentNamesListener> listenerSet = listeners.get(logSegmentsPath); + Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet = + listeners.get(logSegmentsPath); if (null == listenerSet) { return; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java index e29cc47..10a7011 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java @@ -133,9 +133,26 @@ public class BroadCastStatsLogger { } @Override + public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) { + // no-op + } + + @Override public StatsLogger scope(final String scope) { return new Two(first.scope(scope), second.scope(scope)); } + + @Override + public void removeScope(String scope, StatsLogger statsLogger) { + if (!(statsLogger instanceof Two)) { + return; + } + + Two another = (Two) statsLogger; + + first.removeScope(scope, another.first); + second.removeScope(scope, another.second); + } } /** @@ -165,6 +182,11 @@ public class BroadCastStatsLogger { } @Override + public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) { + first.unregisterGauge(statName, gauge); + } + + @Override public StatsLogger scope(String scope) { return new MasterSlave(first.scope(scope), second.scope(scope)); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java index e4c774b..f8fd3eb 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java @@ -367,7 +367,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { lsmStore.registerLogSegmentListener(rootPath, listener); assertEquals(1, lsmStore.listeners.size()); assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath)); - assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener)); + assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener)); while (numNotifications.get() < 1) { TimeUnit.MILLISECONDS.sleep(10); } @@ -429,7 +429,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { lsmStore.registerLogSegmentListener(rootPath, listener); assertEquals(1, lsmStore.listeners.size()); assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath)); - assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener)); + assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener)); while (numNotifications.get() < 1) { TimeUnit.MILLISECONDS.sleep(10); } @@ -496,7 +496,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { lsmStore.registerLogSegmentListener(rootPath, listener); assertEquals(1, lsmStore.listeners.size()); assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath)); - assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener)); + assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener)); while (numNotifications.get() < 1) { TimeUnit.MILLISECONDS.sleep(10); } @@ -510,16 +510,6 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { ZooKeeperClientUtils.expireSession(zkc, DLUtils.getZKServersFromDLUri(uri), conf.getZKSessionTimeoutMilliseconds()); - while (numNotifications.get() < 2) { - TimeUnit.MILLISECONDS.sleep(10); - } - assertEquals("Should receive second segment list update", - 2, numNotifications.get()); - List<String> secondSegmentList = segmentLists.get(1); - Collections.sort(secondSegmentList); - assertEquals("List of segments should be same", - children, secondSegmentList); - logger.info("Create another {} segments.", numSegments); // create another log segment, it should trigger segment list updated @@ -532,12 +522,12 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { List<String> newChildren = zkc.get().getChildren(rootPath, false); Collections.sort(newChildren); logger.info("All log segments become {}", newChildren); - while (numNotifications.get() < 3) { + while (numNotifications.get() < 2) { TimeUnit.MILLISECONDS.sleep(10); } assertEquals("Should receive third segment list update", - 3, numNotifications.get()); - List<String> thirdSegmentList = segmentLists.get(2); + 2, numNotifications.get()); + List<String> thirdSegmentList = segmentLists.get(1); Collections.sort(thirdSegmentList); assertEquals("List of segments should be updated", 2 * numSegments, thirdSegmentList.size()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java index 751e972..3a9b904 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java @@ -90,7 +90,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -378,7 +377,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI // if it is closed, we would not acquire stream again. return null; } - writer = streamManager.getOrCreateStream(stream); + writer = streamManager.getOrCreateStream(stream, true); } finally { closeLock.readLock().unlock(); } @@ -631,26 +630,6 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI logger.info("Released KeepAlive Latch. Main thread will shut the service down."); } - @VisibleForTesting - java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) { - closeLock.readLock().lock(); - try { - if (serverStatus != ServerStatus.WRITE_AND_ACCEPT) { - return null; - } else if (delayMs > 0) { - return scheduler.schedule(runnable, delayMs, TimeUnit.MILLISECONDS); - } else { - return scheduler.submit(runnable); - } - } catch (RejectedExecutionException ree) { - logger.error("Failed to schedule task {} in {} ms : ", - new Object[] { runnable, delayMs, ree }); - return null; - } finally { - closeLock.readLock().unlock(); - } - } - // Test methods. private DynamicDistributedLogConfiguration getDynConf(String streamName) { @@ -664,8 +643,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI } @VisibleForTesting - Stream newStream(String name) { - return streamFactory.create(name, getDynConf(name), streamManager); + Stream newStream(String name) throws IOException { + return streamManager.getOrCreateStream(name, false); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java index 1204d39..3d5b9e7 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java @@ -26,7 +26,6 @@ import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.DistributedLogManager; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.DLException; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import com.twitter.distributedlog.exceptions.OverCapacityException; import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; import com.twitter.distributedlog.exceptions.StreamNotReadyException; @@ -70,24 +69,23 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; public class StreamImpl implements Stream { static final Logger logger = LoggerFactory.getLogger(StreamImpl.class); + /** + * The status of the stream. + * + * The status change of the stream should just go in one direction. If a stream hits + * any error, the stream should be put in error state. If a stream is in error state, + * it should be removed and not reused anymore. + */ public static enum StreamStatus { UNINITIALIZED(-1), INITIALIZING(0), INITIALIZED(1), - // if a stream is in failed state, it could be retried immediately. - // a stream will be put in failed state when encountered any stream exception. - FAILED(-2), - // if a stream is in backoff state, it would backoff for a while. - // a stream will be put in backoff state when failed to acquire the ownership. - BACKOFF(-3), CLOSING(-4), CLOSED(-5), // if a stream is in error state, it should be abort during closing. @@ -112,26 +110,15 @@ public class StreamImpl implements Stream { private final Partition partition; private DistributedLogManager manager; - // A write has been attempted since the last stream acquire. - private volatile boolean writeSinceLastAcquire = false; private volatile AsyncLogWriter writer; private volatile StreamStatus status; private volatile String owner; private volatile Throwable lastException; - private volatile boolean running = true; - private volatile boolean suspended = false; private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>(); private final Promise<Void> closePromise = new Promise<Void>(); private final Object txnLock = new Object(); private final TimeSequencer sequencer = new TimeSequencer(); - // last acquire time - private final Stopwatch lastAcquireWatch = Stopwatch.createUnstarted(); - // last acquire failure time - private final Stopwatch lastAcquireFailureWatch = Stopwatch.createUnstarted(); - private final long nextAcquireWaitTimeMs; - private ScheduledFuture<?> tryAcquireScheduledFuture = null; - private long scheduledAcquireDelayMs = 0L; private final StreamRequestLimiter limiter; private final DynamicDistributedLogConfiguration dynConf; private final DistributedLogConfiguration dlConfig; @@ -165,7 +152,7 @@ public class StreamImpl implements Stream { new ConcurrentHashMap<String, Counter>(); // Since we may create and discard streams at initialization if there's a race, - // must not do any expensive intialization here (particularly any locking or + // must not do any expensive initialization here (particularly any locking or // significant resource allocation etc.). StreamImpl(final String name, final Partition partition, @@ -189,7 +176,6 @@ public class StreamImpl implements Stream { this.partition = partition; this.status = StreamStatus.UNINITIALIZED; this.lastException = new IOException("Fail to write record to stream " + name); - this.nextAcquireWaitTimeMs = dlConfig.getZKSessionTimeoutMilliseconds() * 3 / 5; this.streamConfigProvider = streamConfigProvider; this.dlNamespace = dlNamespace; this.featureRateLimitDisabled = featureProvider.getFeature( @@ -275,54 +261,16 @@ public class StreamImpl implements Stream { return String.format("Stream:%s, %s, %s Status:%s", name, manager, writer, status); } - // schedule stream acquistion - private void tryAcquireStreamOnce() { - if (!running) { - return; - } - - boolean needAcquire = false; - boolean checkNextTime = false; - synchronized (this) { - switch (this.status) { - case INITIALIZING: - streamManager.notifyReleased(this); - needAcquire = true; - break; - case FAILED: - this.status = StreamStatus.INITIALIZING; - streamManager.notifyReleased(this); - needAcquire = true; - break; - case BACKOFF: - // We may end up here after timeout on streamLock. To avoid acquire on every timeout - // we should only try again if a write has been attempted since the last acquire - // attempt. If we end up here because the request handler woke us up, the flag will - // be set and we will try to acquire as intended. - if (writeSinceLastAcquire) { - this.status = StreamStatus.INITIALIZING; - streamManager.notifyReleased(this); - needAcquire = true; - } else { - checkNextTime = true; - } - break; - default: - break; - } - } - if (needAcquire) { - lastAcquireWatch.reset().start(); - acquireStream().addEventListener(new FutureEventListener<Boolean>() { + @Override + public void start() { + // acquire the stream + acquireStream().addEventListener(new FutureEventListener<Boolean>() { @Override public void onSuccess(Boolean success) { - synchronized (StreamImpl.this) { - scheduledAcquireDelayMs = 0L; - tryAcquireScheduledFuture = null; - } if (!success) { - // schedule acquire in nextAcquireWaitTimeMs - scheduleTryAcquireOnce(nextAcquireWaitTimeMs); + // failed to acquire the stream. set the stream in error status and close it. + setStreamInErrorStatus(); + requestClose("Failed to acquire the ownership"); } } @@ -330,65 +278,40 @@ public class StreamImpl implements Stream { public void onFailure(Throwable cause) { // unhandled exceptions logger.error("Stream {} threw unhandled exception : ", name, cause); + // failed to acquire the stream. set the stream in error status and close it. setStreamInErrorStatus(); requestClose("Unhandled exception"); } }); - } else if (StreamStatus.isUnavailable(status)) { - // if the stream is unavailable, stop the thread and close the stream - requestClose("Stream is unavailable anymore"); - } else if (StreamStatus.INITIALIZED != status && lastAcquireWatch.elapsed(TimeUnit.HOURS) > 2) { - // if the stream isn't in initialized state and no writes coming in, then close the stream - requestClose("Stream not used anymore"); - } else if (checkNextTime) { - synchronized (StreamImpl.this) { - scheduledAcquireDelayMs = 0L; - tryAcquireScheduledFuture = null; - } - // schedule acquire in nextAcquireWaitTimeMs - scheduleTryAcquireOnce(nextAcquireWaitTimeMs); - } } - private synchronized void scheduleTryAcquireOnce(long delayMs) { - if (null != tryAcquireScheduledFuture) { - if (delayMs <= 0) { - if (scheduledAcquireDelayMs <= 0L || - (scheduledAcquireDelayMs > 0L - && !tryAcquireScheduledFuture.cancel(false))) { - return; - } - // if the scheduled one could be cancelled, re-submit one - } else { - return; + // + // Stats Operations + // + + void countException(Throwable t, StatsLogger streamExceptionLogger) { + String exceptionName = null == t ? "null" : t.getClass().getName(); + Counter counter = exceptionCounters.get(exceptionName); + if (null == counter) { + counter = exceptionStatLogger.getCounter(exceptionName); + Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter); + if (null != oldCounter) { + counter = oldCounter; } } - tryAcquireScheduledFuture = schedule(new Runnable() { - @Override - public void run() { - tryAcquireStreamOnce(); - } - }, delayMs); - scheduledAcquireDelayMs = delayMs; + counter.inc(); + streamExceptionLogger.getCounter(exceptionName).inc(); } - @Override - public void start() { - scheduleTryAcquireOnce(0); + boolean isCriticalException(Throwable cause) { + return !(cause instanceof OwnershipAcquireFailedException); } - ScheduledFuture<?> schedule(Runnable runnable, long delayMs) { - if (!running) { - return null; - } - try { - return scheduler.schedule(name, runnable, delayMs, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ree) { - logger.error("Failed to schedule task {} in {} ms : ", - new Object[] { runnable, delayMs, ree }); - return null; - } - } + // + // Service Timeout: + // - schedule a timeout function to handle operation timeouts: {@link #handleServiceTimeout(String)} + // - if the operation is completed within timeout period, cancel the timeout. + // void scheduleTimeout(final StreamOp op) { final Timeout timeout = requestTimer.newTimeout(new TimerTask() { @@ -418,12 +341,14 @@ public class StreamImpl implements Stream { * stream off the proxy for a period of time, hopefully long enough for the * issues to be resolved, or for whoop to kick in and kill the shard. */ - synchronized void handleServiceTimeout(String reason) { - if (StreamStatus.isUnavailable(status)) { - return; + void handleServiceTimeout(String reason) { + synchronized (this) { + if (StreamStatus.isUnavailable(status)) { + return; + } + // Mark stream in error state + setStreamInErrorStatus(); } - // Mark stream in error state - setStreamInErrorStatus(); // Async close request, and schedule eviction when its done. Future<Void> closeFuture = requestClose(reason, false /* dont remove */); @@ -436,6 +361,10 @@ public class StreamImpl implements Stream { }); } + // + // Submit the operation to the stream. + // + /** * Execute the StreamOp. If reacquire is needed, this may initiate reacquire and queue the op for * execution once complete. @@ -445,9 +374,6 @@ public class StreamImpl implements Stream { */ @Override public void submit(StreamOp op) { - // Let stream acquire thread know a write has been attempted. - writeSinceLastAcquire = true; - try { limiter.apply(op); } catch (OverCapacityException ex) { @@ -460,36 +386,28 @@ public class StreamImpl implements Stream { scheduleTimeout(op); } - boolean notifyAcquireThread = false; boolean completeOpNow = false; boolean success = true; if (StreamStatus.isUnavailable(status)) { // Stream is closed, fail the op immediately op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); return; - } if (StreamStatus.INITIALIZED == status && writer != null) { + } else if (StreamStatus.INITIALIZED == status && writer != null) { completeOpNow = true; success = true; } else { synchronized (this) { if (StreamStatus.isUnavailable(status)) { - // complete the write op as {@link #executeOp(op, success)} will handle closed case. - completeOpNow = true; - success = true; + // Stream is closed, fail the op immediately + op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); + return; } if (StreamStatus.INITIALIZED == status) { completeOpNow = true; success = true; - } else if (StreamStatus.BACKOFF == status && - lastAcquireFailureWatch.elapsed(TimeUnit.MILLISECONDS) < nextAcquireWaitTimeMs) { - completeOpNow = true; - success = false; } else if (failFastOnStreamNotReady) { - notifyAcquireThread = true; - completeOpNow = false; - success = false; op.fail(new StreamNotReadyException("Stream " + name + " is not ready; status = " + status)); - } else { // closing & initializing - notifyAcquireThread = true; + return; + } else { // the stream is still initializing pendingOps.add(op); pendingOpsCounter.inc(); if (1 == pendingOps.size()) { @@ -500,14 +418,15 @@ public class StreamImpl implements Stream { } } } - if (notifyAcquireThread && !suspended) { - scheduleTryAcquireOnce(0L); - } if (completeOpNow) { executeOp(op, success); } } + // + // Execute operations and handle exceptions on operations + // + /** * Execute the <i>op</i> immediately. * @@ -516,20 +435,7 @@ public class StreamImpl implements Stream { * @param success * whether the operation is success or not. */ - void executeOp(StreamOp op, boolean success) { - closeLock.readLock().lock(); - try { - if (StreamStatus.isUnavailable(status)) { - op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); - return; - } - doExecuteOp(op, success); - } finally { - closeLock.readLock().unlock(); - } - } - - private void doExecuteOp(final StreamOp op, boolean success) { + void executeOp(final StreamOp op, boolean success) { final AsyncLogWriter writer; final Throwable lastException; synchronized (this) { @@ -552,7 +458,7 @@ public class StreamImpl implements Stream { case FOUND: assert(cause instanceof OwnershipAcquireFailedException); countAsException = false; - handleOwnershipAcquireFailedException(op, (OwnershipAcquireFailedException) cause); + handleExceptionOnStreamOp(op, cause); break; case ALREADY_CLOSED: assert(cause instanceof AlreadyClosedException); @@ -573,13 +479,14 @@ public class StreamImpl implements Stream { case OVER_CAPACITY: op.fail(cause); break; - // exceptions that *could* / *might* be recovered by creating a new writer + // the DL writer hits exception, simple set the stream to error status + // and fail the request default: - handleRecoverableDLException(op, cause); + handleExceptionOnStreamOp(op, cause); break; } } else { - handleUnknownException(op, cause); + handleExceptionOnStreamOp(op, cause); } if (countAsException) { countException(cause, streamExceptionStatLogger); @@ -587,88 +494,41 @@ public class StreamImpl implements Stream { } }); } else { - op.fail(lastException); - } - } - - /** - * Handle recoverable dl exception. - * - * @param op - * stream operation executing - * @param cause - * exception received when executing <i>op</i> - */ - private void handleRecoverableDLException(StreamOp op, final Throwable cause) { - AsyncLogWriter oldWriter = null; - boolean statusChanged = false; - synchronized (this) { - if (StreamStatus.INITIALIZED == status) { - oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED, - null, null, cause); - statusChanged = true; + if (null != lastException) { + op.fail(lastException); + } else { + op.fail(new StreamUnavailableException("Stream " + name + " is closed.")); } } - if (statusChanged) { - Abortables.asyncAbort(oldWriter, false); - logger.error("Failed to write data into stream {} : ", name, cause); - scheduleTryAcquireOnce(0L); - } - op.fail(cause); } /** - * Handle unknown exception when executing <i>op</i>. + * Handle exception when executing <i>op</i>. * * @param op * stream operation executing * @param cause * exception received when executing <i>op</i> */ - private void handleUnknownException(StreamOp op, final Throwable cause) { + private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) { AsyncLogWriter oldWriter = null; boolean statusChanged = false; synchronized (this) { if (StreamStatus.INITIALIZED == status) { - oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED, - null, null, cause); + oldWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, cause); statusChanged = true; } } if (statusChanged) { Abortables.asyncAbort(oldWriter, false); - logger.error("Failed to write data into stream {} : ", name, cause); - scheduleTryAcquireOnce(0L); - } - op.fail(cause); - } - - /** - * Handle losing ownership during executing <i>op</i>. - * - * @param op - * stream operation executing - * @param oafe - * the ownership exception received when executing <i>op</i> - */ - private void handleOwnershipAcquireFailedException(StreamOp op, final OwnershipAcquireFailedException oafe) { - logger.warn("Failed to write data into stream {} because stream is acquired by {} : {}", - new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()}); - AsyncLogWriter oldWriter = null; - boolean statusChanged = false; - synchronized (this) { - if (StreamStatus.INITIALIZED == status) { - oldWriter = - setStreamStatus(StreamStatus.BACKOFF, StreamStatus.INITIALIZED, - null, oafe.getCurrentOwner(), oafe); - statusChanged = true; + if (isCriticalException(cause)) { + logger.error("Failed to write data into stream {} : ", name, cause); + } else { + logger.warn("Failed to write data into stream {} : {}", name, cause.getMessage()); } + requestClose("Failed to write data into stream " + name + " : " + cause.getMessage()); } - if (statusChanged) { - Abortables.asyncAbort(oldWriter, false); - scheduleTryAcquireOnce(nextAcquireWaitTimeMs); - } - op.fail(oafe); + op.fail(cause); } /** @@ -680,129 +540,126 @@ public class StreamImpl implements Stream { fatalErrorHandler.notifyFatalError(); } - void countException(Throwable t, StatsLogger streamExceptionLogger) { - String exceptionName = null == t ? "null" : t.getClass().getName(); - Counter counter = exceptionCounters.get(exceptionName); - if (null == counter) { - counter = exceptionStatLogger.getCounter(exceptionName); - Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter); - if (null != oldCounter) { - counter = oldCounter; - } - } - counter.inc(); - streamExceptionLogger.getCounter(exceptionName).inc(); - } + // + // Acquire streams + // Future<Boolean> acquireStream() { - // Reset this flag so the acquire thread knows whether re-acquire is needed. - writeSinceLastAcquire = false; - final Stopwatch stopwatch = Stopwatch.createStarted(); final Promise<Boolean> acquirePromise = new Promise<Boolean>(); manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() { @Override public void onSuccess(AsyncLogWriter w) { - synchronized (txnLock) { - sequencer.setLastId(w.getLastTxId()); - } - AsyncLogWriter oldWriter; - Queue<StreamOp> oldPendingOps; - boolean success; - synchronized (StreamImpl.this) { - oldWriter = setStreamStatus(StreamStatus.INITIALIZED, - StreamStatus.INITIALIZING, w, null, null); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - success = true; - } - // check if the stream is allowed to be acquired - if (!streamManager.allowAcquire(StreamImpl.this)) { - if (null != oldWriter) { - Abortables.asyncAbort(oldWriter, true); - } - int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy(); - StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream() - + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions"); - countException(sue, exceptionStatLogger); - logger.error("Failed to acquire stream {} because it is unavailable : {}", - name, sue.getMessage()); - synchronized (this) { - oldWriter = setStreamStatus(StreamStatus.ERROR, - StreamStatus.INITIALIZED, null, null, sue); - // we don't switch the pending ops since they are already switched - // when setting the status to initialized - success = false; - } - } - processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps); + onAcquireStreamSuccess(w, stopwatch, acquirePromise); } @Override public void onFailure(Throwable cause) { - AsyncLogWriter oldWriter; - Queue<StreamOp> oldPendingOps; - boolean success; - if (cause instanceof AlreadyClosedException) { - countException(cause, streamExceptionStatLogger); - handleAlreadyClosedException((AlreadyClosedException) cause); - return; - } else if (cause instanceof OwnershipAcquireFailedException) { - OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause; - logger.warn("Failed to acquire stream ownership for {}, current owner is {} : {}", - new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()}); - synchronized (StreamImpl.this) { - oldWriter = setStreamStatus(StreamStatus.BACKOFF, - StreamStatus.INITIALIZING, null, oafe.getCurrentOwner(), oafe); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - success = false; - } - } else if (cause instanceof InvalidStreamNameException) { - InvalidStreamNameException isne = (InvalidStreamNameException) cause; - countException(isne, streamExceptionStatLogger); - logger.error("Failed to acquire stream {} due to its name is invalid", name); - synchronized (StreamImpl.this) { - oldWriter = setStreamStatus(StreamStatus.ERROR, - StreamStatus.INITIALIZING, null, null, isne); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - success = false; - } - } else { - countException(cause, streamExceptionStatLogger); - logger.error("Failed to initialize stream {} : ", name, cause); - synchronized (StreamImpl.this) { - oldWriter = setStreamStatus(StreamStatus.FAILED, - StreamStatus.INITIALIZING, null, null, cause); - oldPendingOps = pendingOps; - pendingOps = new ArrayDeque<StreamOp>(); - success = false; - } - } - processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps); + onAcquireStreamFailure(cause, stopwatch, acquirePromise); } - void processPendingRequestsAfterOpen(boolean success, - AsyncLogWriter oldWriter, - Queue<StreamOp> oldPendingOps) { - if (success) { - streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } else { - streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - for (StreamOp op : oldPendingOps) { - executeOp(op, success); - pendingOpsCounter.dec(); - } - Abortables.asyncAbort(oldWriter, true); - FutureUtils.setValue(acquirePromise, success); - } }, scheduler, getStreamName())); return acquirePromise; } + private void onAcquireStreamSuccess(AsyncLogWriter w, + Stopwatch stopwatch, + Promise<Boolean> acquirePromise) { + synchronized (txnLock) { + sequencer.setLastId(w.getLastTxId()); + } + AsyncLogWriter oldWriter; + Queue<StreamOp> oldPendingOps; + boolean success; + synchronized (StreamImpl.this) { + oldWriter = setStreamStatus(StreamStatus.INITIALIZED, + StreamStatus.INITIALIZING, w, null); + oldPendingOps = pendingOps; + pendingOps = new ArrayDeque<StreamOp>(); + success = true; + } + // check if the stream is allowed to be acquired + if (!streamManager.allowAcquire(StreamImpl.this)) { + if (null != oldWriter) { + Abortables.asyncAbort(oldWriter, true); + } + int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy(); + StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream() + + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions"); + countException(sue, exceptionStatLogger); + logger.error("Failed to acquire stream {} because it is unavailable : {}", + name, sue.getMessage()); + synchronized (this) { + oldWriter = setStreamStatus(StreamStatus.ERROR, + StreamStatus.INITIALIZED, null, sue); + // we don't switch the pending ops since they are already switched + // when setting the status to initialized + success = false; + } + } + processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise); + } + + private void onAcquireStreamFailure(Throwable cause, + Stopwatch stopwatch, + Promise<Boolean> acquirePromise) { + AsyncLogWriter oldWriter; + Queue<StreamOp> oldPendingOps; + boolean success; + if (cause instanceof AlreadyClosedException) { + countException(cause, streamExceptionStatLogger); + handleAlreadyClosedException((AlreadyClosedException) cause); + return; + } else { + if (isCriticalException(cause)) { + countException(cause, streamExceptionStatLogger); + logger.error("Failed to acquire stream {} : ", name, cause); + } else { + logger.warn("Failed to acquire stream {} : {}", name, cause.getMessage()); + } + synchronized (StreamImpl.this) { + oldWriter = setStreamStatus(StreamStatus.ERROR, + StreamStatus.INITIALIZING, null, cause); + oldPendingOps = pendingOps; + pendingOps = new ArrayDeque<StreamOp>(); + success = false; + } + } + processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise); + } + + /** + * Process the pending request after acquired stream. + * + * @param success whether the acquisition succeed or not + * @param oldWriter the old writer to abort + * @param oldPendingOps the old pending ops to execute + * @param stopwatch stopwatch to measure the time spent on acquisition + * @param acquirePromise the promise to complete the acquire operation + */ + void processPendingRequestsAfterAcquire(boolean success, + AsyncLogWriter oldWriter, + Queue<StreamOp> oldPendingOps, + Stopwatch stopwatch, + Promise<Boolean> acquirePromise) { + if (success) { + streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } else { + streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } + for (StreamOp op : oldPendingOps) { + executeOp(op, success); + pendingOpsCounter.dec(); + } + Abortables.asyncAbort(oldWriter, true); + FutureUtils.setValue(acquirePromise, success); + } + + // + // Stream Status Changes + // + synchronized void setStreamInErrorStatus() { if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) { return; @@ -819,8 +676,6 @@ public class StreamImpl implements Stream { * old status * @param writer * new log writer - * @param owner - * new owner * @param t * new exception * @return old writer if it exists @@ -828,7 +683,6 @@ public class StreamImpl implements Stream { synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus, StreamStatus oldStatus, AsyncLogWriter writer, - String owner, Throwable t) { if (oldStatus != this.status) { logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}", @@ -836,6 +690,11 @@ public class StreamImpl implements Stream { return null; } + String owner = null; + if (t instanceof OwnershipAcquireFailedException) { + owner = ((OwnershipAcquireFailedException) t).getCurrentOwner(); + } + AsyncLogWriter oldWriter = this.writer; this.writer = writer; if (null != owner && owner.equals(clientId)) { @@ -852,10 +711,6 @@ public class StreamImpl implements Stream { } this.lastException = t; this.status = newStatus; - if (StreamStatus.BACKOFF == newStatus && null != owner) { - // start failure watch - this.lastAcquireFailureWatch.reset().start(); - } if (StreamStatus.INITIALIZED == newStatus) { streamManager.notifyAcquired(this); logger.info("Inserted acquired stream {} -> writer {}", name, this); @@ -866,12 +721,16 @@ public class StreamImpl implements Stream { return oldWriter; } + // + // Stream Close Functions + // + void close(DistributedLogManager dlm) { if (null != dlm) { try { dlm.close(); } catch (IOException ioe) { - logger.warn("Failed to close dlm for {} : ", ioe); + logger.warn("Failed to close dlm for {} : ", name, ioe); } } } @@ -902,12 +761,16 @@ public class StreamImpl implements Stream { // them. close(abort); if (uncache) { + final long probationTimeoutMs; + if (null != owner) { + probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3; + } else { + probationTimeoutMs = 0L; + } closePromise.onSuccess(new AbstractFunction1<Void, BoxedUnit>() { @Override public BoxedUnit apply(Void result) { - if (streamManager.notifyRemoved(StreamImpl.this)) { - logger.info("Removed cached stream {} after closed.", name); - } + streamManager.scheduleRemoval(StreamImpl.this, probationTimeoutMs); return BoxedUnit.UNIT; } }); @@ -949,14 +812,6 @@ public class StreamImpl implements Stream { closeLock.writeLock().unlock(); } logger.info("Closing stream {} ...", name); - running = false; - // stop any outstanding ownership acquire actions first - synchronized (this) { - if (null != tryAcquireScheduledFuture) { - tryAcquireScheduledFuture.cancel(true); - } - } - logger.info("Stopped threads of stream {}.", name); // Close the writers to release the locks before failing the requests Future<Void> closeWriterFuture; if (abort) { @@ -1016,19 +871,6 @@ public class StreamImpl implements Stream { // Test-only apis @VisibleForTesting - public StreamImpl suspendAcquiring() { - suspended = true; - return this; - } - - @VisibleForTesting - public StreamImpl resumeAcquiring() { - suspended = false; - scheduleTryAcquireOnce(0L); - return this; - } - - @VisibleForTesting public int numPendingOps() { Queue<StreamOp> queue = pendingOps; return null == queue ? 0 : queue.size(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java index 972eb55..e171e46 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java @@ -43,10 +43,11 @@ public interface StreamManager { /** * Get a cached stream and create a new one if it doesnt exist. - * @param stream name + * @param streamName stream name + * @param start whether to start the stream after it is created. * @return future satisfied once close complete */ - Stream getOrCreateStream(String stream) throws IOException; + Stream getOrCreateStream(String streamName, boolean start) throws IOException; /** * Asynchronously create a new stream.