http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java new file mode 100644 index 0000000..d89dddb --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java @@ -0,0 +1,630 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.metadata; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.DistributedLogConstants; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.exceptions.DLException; +import com.twitter.distributedlog.exceptions.DLInterruptedException; +import com.twitter.distributedlog.exceptions.InvalidStreamNameException; +import com.twitter.distributedlog.exceptions.LockCancelledException; +import com.twitter.distributedlog.exceptions.LogExistsException; +import com.twitter.distributedlog.exceptions.LogNotFoundException; +import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.exceptions.ZKException; +import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore; +import com.twitter.distributedlog.lock.DistributedLock; +import com.twitter.distributedlog.lock.SessionLockFactory; +import com.twitter.distributedlog.lock.ZKDistributedLock; +import com.twitter.distributedlog.lock.ZKSessionLockFactory; +import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; +import com.twitter.distributedlog.metadata.LogStreamMetadataStore; +import com.twitter.distributedlog.util.DLUtils; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.SchedulerUtils; +import com.twitter.distributedlog.zk.LimitedPermitManager; +import com.twitter.distributedlog.util.OrderedScheduler; +import com.twitter.distributedlog.util.PermitManager; +import com.twitter.distributedlog.util.Transaction; +import com.twitter.distributedlog.util.Utils; +import com.twitter.distributedlog.zk.ZKTransaction; +import com.twitter.util.ExceptionalFunction; +import com.twitter.util.ExceptionalFunction0; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import org.apache.bookkeeper.meta.ZkVersion; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.ZKUtil; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.PathUtils; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*; + +/** + * zookeeper based {@link LogStreamMetadataStore} + */ +public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { + + private final static Logger LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class); + + private final String clientId; + private final DistributedLogConfiguration conf; + private final ZooKeeperClient zooKeeperClient; + private final OrderedScheduler scheduler; + private final StatsLogger statsLogger; + private final LogSegmentMetadataStore logSegmentStore; + private final LimitedPermitManager permitManager; + // lock + private SessionLockFactory lockFactory; + private OrderedScheduler lockStateExecutor; + + public ZKLogStreamMetadataStore(String clientId, + DistributedLogConfiguration conf, + ZooKeeperClient zkc, + OrderedScheduler scheduler, + StatsLogger statsLogger) { + this.clientId = clientId; + this.conf = conf; + this.zooKeeperClient = zkc; + this.scheduler = scheduler; + this.statsLogger = statsLogger; + // create the log segment metadata store and the permit manager (used for log segment rolling) + this.logSegmentStore = new ZKLogSegmentMetadataStore(conf, zooKeeperClient, scheduler); + this.permitManager = new LimitedPermitManager( + conf.getLogSegmentRollingConcurrency(), + 1, + TimeUnit.MINUTES, + scheduler); + this.zooKeeperClient.register(permitManager); + } + + private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) { + if (createIfNull && null == lockStateExecutor) { + StatsLogger lockStateStatsLogger = statsLogger.scope("lock_scheduler"); + lockStateExecutor = OrderedScheduler.newBuilder() + .name("DLM-LockState") + .corePoolSize(conf.getNumLockStateThreads()) + .statsLogger(lockStateStatsLogger) + .perExecutorStatsLogger(lockStateStatsLogger) + .traceTaskExecution(conf.getEnableTaskExecutionStats()) + .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros()) + .build(); + } + return lockStateExecutor; + } + + private synchronized SessionLockFactory getLockFactory(boolean createIfNull) { + if (createIfNull && null == lockFactory) { + lockFactory = new ZKSessionLockFactory( + zooKeeperClient, + clientId, + getLockStateExecutor(createIfNull), + conf.getZKNumRetries(), + conf.getLockTimeoutMilliSeconds(), + conf.getZKRetryBackoffStartMillis(), + statsLogger); + } + return lockFactory; + } + + @Override + public void close() throws IOException { + this.zooKeeperClient.unregister(permitManager); + this.permitManager.close(); + this.logSegmentStore.close(); + SchedulerUtils.shutdownScheduler( + getLockStateExecutor(false), + conf.getSchedulerShutdownTimeoutMs(), + TimeUnit.MILLISECONDS); + } + + @Override + public LogSegmentMetadataStore getLogSegmentMetadataStore() { + return logSegmentStore; + } + + @Override + public PermitManager getPermitManager() { + return this.permitManager; + } + + @Override + public Transaction<Object> newTransaction() { + return new ZKTransaction(zooKeeperClient); + } + + @Override + public Future<Void> logExists(URI uri, final String logName) { + final String logSegmentsPath = ZKLogMetadata.getLogSegmentsPath( + uri, logName, conf.getUnpartitionedStreamName()); + final Promise<Void> promise = new Promise<Void>(); + try { + final ZooKeeper zk = zooKeeperClient.get(); + zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int syncRc, String path, Object syncCtx) { + if (KeeperException.Code.NONODE.intValue() == syncRc) { + promise.setException(new LogNotFoundException( + String.format("Log %s does not exist or has been deleted", logName))); + return; + } else if (KeeperException.Code.OK.intValue() != syncRc){ + promise.setException(new ZKException("Error on checking log existence for " + logName, + KeeperException.create(KeeperException.Code.get(syncRc)))); + return; + } + zk.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() { + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + if (KeeperException.Code.OK.intValue() == rc) { + promise.setValue(null); + } else if (KeeperException.Code.NONODE.intValue() == rc) { + promise.setException(new LogNotFoundException( + String.format("Log %s does not exist or has been deleted", logName))); + } else { + promise.setException(new ZKException("Error on checking log existence for " + logName, + KeeperException.create(KeeperException.Code.get(rc)))); + } + } + }, null); + } + }, null); + + } catch (InterruptedException ie) { + LOG.error("Interrupted while reading {}", logSegmentsPath, ie); + promise.setException(new DLInterruptedException("Interrupted while checking " + + logSegmentsPath, ie)); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + promise.setException(e); + } + return promise; + } + + // + // Create Write Lock + // + + @Override + public DistributedLock createWriteLock(ZKLogMetadataForWriter metadata) { + return new ZKDistributedLock( + getLockStateExecutor(true), + getLockFactory(true), + metadata.getLockPath(), + conf.getLockTimeoutMilliSeconds(), + statsLogger); + } + + // + // Create Read Lock + // + + private Future<Void> ensureReadLockPathExist(final ZKLogMetadata logMetadata, + final String readLockPath) { + final Promise<Void> promise = new Promise<Void>(); + promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() { + @Override + public BoxedUnit apply(Throwable t) { + FutureUtils.setException(promise, new LockCancelledException(readLockPath, + "Could not ensure read lock path", t)); + return null; + } + }); + Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath()); + Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate, + new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT, + new org.apache.zookeeper.AsyncCallback.StringCallback() { + @Override + public void processResult(final int rc, final String path, Object ctx, String name) { + if (KeeperException.Code.NONODE.intValue() == rc) { + FutureUtils.setException(promise, new LogNotFoundException( + String.format("Log %s does not exist or has been deleted", + logMetadata.getFullyQualifiedName()))); + } else if (KeeperException.Code.OK.intValue() == rc) { + FutureUtils.setValue(promise, null); + LOG.trace("Created path {}.", path); + } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { + FutureUtils.setValue(promise, null); + LOG.trace("Path {} is already existed.", path); + } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) { + FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path)); + } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) { + FutureUtils.setException(promise, new DLInterruptedException(path)); + } else { + FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc))); + } + } + }, null); + return promise; + } + + @Override + public Future<DistributedLock> createReadLock(final ZKLogMetadataForReader metadata, + Optional<String> readerId) { + final String readLockPath = metadata.getReadLockPath(readerId); + return ensureReadLockPathExist(metadata, readLockPath).flatMap( + new ExceptionalFunction<Void, Future<DistributedLock>>() { + @Override + public Future<DistributedLock> applyE(Void value) throws Throwable { + // Unfortunately this has a blocking call which we should not execute on the + // ZK completion thread + return scheduler.apply(new ExceptionalFunction0<DistributedLock>() { + @Override + public DistributedLock applyE() throws Throwable { + return new ZKDistributedLock( + getLockStateExecutor(true), + getLockFactory(true), + readLockPath, + conf.getLockTimeoutMilliSeconds(), + statsLogger.scope("read_lock")); + } + }); + } + }); + } + + // + // Create Log + // + + static class MetadataIndex { + static final int LOG_ROOT_PARENT = 0; + static final int LOG_ROOT = 1; + static final int MAX_TXID = 2; + static final int VERSION = 3; + static final int LOCK = 4; + static final int READ_LOCK = 5; + static final int LOGSEGMENTS = 6; + static final int ALLOCATION = 7; + } + + static int bytesToInt(byte[] b) { + assert b.length >= 4; + return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3]; + } + + static byte[] intToBytes(int i) { + return new byte[]{ + (byte) (i >> 24), + (byte) (i >> 16), + (byte) (i >> 8), + (byte) (i)}; + } + + static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk, + String logRootPath, + boolean ownAllocator) { + // Note re. persistent lock state initialization: the read lock persistent state (path) is + // initialized here but only used in the read handler. The reason is its more convenient and + // less error prone to manage all stream structure in one place. + final String logRootParentPath = new File(logRootPath).getParent(); + final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH; + final String maxTxIdPath = logRootPath + MAX_TXID_PATH; + final String lockPath = logRootPath + LOCK_PATH; + final String readLockPath = logRootPath + READ_LOCK_PATH; + final String versionPath = logRootPath + VERSION_PATH; + final String allocationPath = logRootPath + ALLOCATION_PATH; + + int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1; + List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths); + checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false)); + checkFutures.add(Utils.zkGetData(zk, logRootPath, false)); + checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false)); + checkFutures.add(Utils.zkGetData(zk, versionPath, false)); + checkFutures.add(Utils.zkGetData(zk, lockPath, false)); + checkFutures.add(Utils.zkGetData(zk, readLockPath, false)); + checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false)); + if (ownAllocator) { + checkFutures.add(Utils.zkGetData(zk, allocationPath, false)); + } + + return Future.collect(checkFutures); + } + + static boolean pathExists(Versioned<byte[]> metadata) { + return null != metadata.getValue() && null != metadata.getVersion(); + } + + static void ensureMetadataExist(Versioned<byte[]> metadata) { + Preconditions.checkNotNull(metadata.getValue()); + Preconditions.checkNotNull(metadata.getVersion()); + } + + static void createMissingMetadata(final ZooKeeper zk, + final String logRootPath, + final List<Versioned<byte[]>> metadatas, + final List<ACL> acl, + final boolean ownAllocator, + final boolean createIfNotExists, + final Promise<List<Versioned<byte[]>>> promise) { + final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size()); + final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size()); + CreateMode createMode = CreateMode.PERSISTENT; + + // log root parent path + if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) { + pathsToCreate.add(null); + } else { + String logRootParentPath = new File(logRootPath).getParent(); + pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); + zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); + } + + // log root path + if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) { + pathsToCreate.add(null); + } else { + pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); + zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); + } + + // max id + if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) { + pathsToCreate.add(null); + } else { + byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L); + pathsToCreate.add(zeroTxnIdData); + zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode)); + } + // version + if (pathExists(metadatas.get(MetadataIndex.VERSION))) { + pathsToCreate.add(null); + } else { + byte[] versionData = intToBytes(LAYOUT_VERSION); + pathsToCreate.add(versionData); + zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode)); + } + // lock path + if (pathExists(metadatas.get(MetadataIndex.LOCK))) { + pathsToCreate.add(null); + } else { + pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); + zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); + } + // read lock path + if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) { + pathsToCreate.add(null); + } else { + pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); + zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); + } + // log segments path + if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) { + pathsToCreate.add(null); + } else { + byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber( + DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO); + pathsToCreate.add(logSegmentsData); + zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode)); + } + // allocation path + if (ownAllocator) { + if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) { + pathsToCreate.add(null); + } else { + pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); + zkOps.add(Op.create(logRootPath + ALLOCATION_PATH, + DistributedLogConstants.EMPTY_BYTES, acl, createMode)); + } + } + if (zkOps.isEmpty()) { + // nothing missed + promise.setValue(metadatas); + return; + } + if (!createIfNotExists) { + promise.setException(new LogNotFoundException("Log " + logRootPath + " not found")); + return; + } + + zk.multi(zkOps, new AsyncCallback.MultiCallback() { + @Override + public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) { + if (KeeperException.Code.OK.intValue() == rc) { + List<Versioned<byte[]>> finalMetadatas = + Lists.newArrayListWithExpectedSize(metadatas.size()); + for (int i = 0; i < pathsToCreate.size(); i++) { + byte[] dataCreated = pathsToCreate.get(i); + if (null == dataCreated) { + finalMetadatas.add(metadatas.get(i)); + } else { + finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0))); + } + } + promise.setValue(finalMetadatas); + } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { + promise.setException(new LogExistsException("Someone just created log " + + logRootPath)); + } else { + if (LOG.isDebugEnabled()) { + StringBuilder builder = new StringBuilder(); + for (OpResult result : resultList) { + if (result instanceof OpResult.ErrorResult) { + OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result; + builder.append(errorResult.getErr()).append(","); + } else { + builder.append(0).append(","); + } + } + String resultCodeList = builder.substring(0, builder.length() - 1); + LOG.debug("Failed to create log, full rc list = {}", resultCodeList); + } + + promise.setException(new ZKException("Failed to create log " + logRootPath, + KeeperException.Code.get(rc))); + } + } + }, null); + } + + static ZKLogMetadataForWriter processLogMetadatas(URI uri, + String logName, + String logIdentifier, + List<Versioned<byte[]>> metadatas, + boolean ownAllocator) + throws UnexpectedException { + try { + // max id + Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID); + ensureMetadataExist(maxTxnIdData); + // version + Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION); + ensureMetadataExist(maxTxnIdData); + Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue())); + // lock path + ensureMetadataExist(metadatas.get(MetadataIndex.LOCK)); + // read lock path + ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK)); + // max lssn + Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS); + ensureMetadataExist(maxLSSNData); + try { + DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue()); + } catch (NumberFormatException nfe) { + throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe); + } + // allocation path + Versioned<byte[]> allocationData; + if (ownAllocator) { + allocationData = metadatas.get(MetadataIndex.ALLOCATION); + ensureMetadataExist(allocationData); + } else { + allocationData = new Versioned<byte[]>(null, null); + } + return new ZKLogMetadataForWriter(uri, logName, logIdentifier, + maxLSSNData, maxTxnIdData, allocationData); + } catch (IllegalArgumentException iae) { + throw new UnexpectedException("Invalid log " + logName, iae); + } catch (NullPointerException npe) { + throw new UnexpectedException("Invalid log " + logName, npe); + } + } + + static Future<ZKLogMetadataForWriter> getLog(final URI uri, + final String logName, + final String logIdentifier, + final ZooKeeperClient zooKeeperClient, + final boolean ownAllocator, + final boolean createIfNotExists) { + final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, logIdentifier); + try { + PathUtils.validatePath(logRootPath); + } catch (IllegalArgumentException e) { + LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e}); + return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid")); + } + + try { + final ZooKeeper zk = zooKeeperClient.get(); + return checkLogMetadataPaths(zk, logRootPath, ownAllocator) + .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() { + @Override + public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) { + Promise<List<Versioned<byte[]>>> promise = + new Promise<List<Versioned<byte[]>>>(); + createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(), + ownAllocator, createIfNotExists, promise); + return promise; + } + }).map(new ExceptionalFunction<List<Versioned<byte[]>>, ZKLogMetadataForWriter>() { + @Override + public ZKLogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException { + return processLogMetadatas( + uri, + logName, + logIdentifier, + metadatas, + ownAllocator); + } + }); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + return Future.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName, + KeeperException.Code.CONNECTIONLOSS)); + } catch (InterruptedException e) { + return Future.exception(new DLInterruptedException("Interrupted on creating log " + logName, e)); + } + } + + @Override + public Future<ZKLogMetadataForWriter> getLog(final URI uri, + final String logName, + final boolean ownAllocator, + final boolean createIfNotExists) { + return getLog( + uri, + logName, + conf.getUnpartitionedStreamName(), + zooKeeperClient, + ownAllocator, + createIfNotExists); + } + + // + // Delete Log + // + + @Override + public Future<Void> deleteLog(URI uri, final String logName) { + final Promise<Void> promise = new Promise<Void>(); + try { + String streamPath = ZKLogMetadata.getLogStreamPath(uri, logName); + ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + if (KeeperException.Code.OK.intValue() != rc) { + FutureUtils.setException(promise, + new ZKException("Encountered zookeeper issue on deleting log stream " + + logName, KeeperException.Code.get(rc))); + return; + } + FutureUtils.setValue(promise, null); + } + }, null); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream " + + logName, KeeperException.Code.CONNECTIONLOSS)); + } catch (InterruptedException e) { + FutureUtils.setException(promise, new DLInterruptedException("Interrupted while deleting log stream " + + logName)); + } catch (KeeperException e) { + FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream " + + logName, e)); + } + return promise; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java index 2ea1671..5144634 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java @@ -20,6 +20,8 @@ package com.twitter.distributedlog.logsegment; import com.google.common.annotations.Beta; import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.callback.LogSegmentNamesListener; +import com.twitter.distributedlog.impl.metadata.ZKLogMetadata; +import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; import com.twitter.distributedlog.util.Transaction; import com.twitter.distributedlog.util.Transaction.OpListener; import com.twitter.util.Future; @@ -52,15 +54,15 @@ public interface LogSegmentMetadataStore extends Closeable { * * @param txn * transaction to execute for storing log segment sequence number. - * @param path - * path to store sequence number + * @param logMetadata + * metadata of the log stream * @param sequenceNumber * log segment sequence number to store * @param listener * listener on the result to this operation */ void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn, - String path, + ZKLogMetadata logMetadata, Versioned<Long> sequenceNumber, OpListener<Version> listener); @@ -69,15 +71,15 @@ public interface LogSegmentMetadataStore extends Closeable { * * @param txn * transaction to execute for storing transaction id - * @param path - * path to store sequence number + * @param logMetadata + * metadata of the log stream * @param transactionId * transaction id to store * @param listener * listener on the result to this operation */ void storeMaxTxnId(Transaction<Object> txn, - String path, + ZKLogMetadataForWriter logMetadata, Versioned<Long> transactionId, OpListener<Version> listener); @@ -91,8 +93,12 @@ public interface LogSegmentMetadataStore extends Closeable { * transaction to execute for this operation * @param segment * segment to create + * @param opListener + * the listener on the operation result */ - void createLogSegment(Transaction<Object> txn, LogSegmentMetadata segment); + void createLogSegment(Transaction<Object> txn, + LogSegmentMetadata segment, + OpListener<Void> opListener); /** * Delete a log segment <code>segment</code> under transaction <code>txn</code>. @@ -105,7 +111,9 @@ public interface LogSegmentMetadataStore extends Closeable { * @param segment * segment to delete */ - void deleteLogSegment(Transaction<Object> txn, LogSegmentMetadata segment); + void deleteLogSegment(Transaction<Object> txn, + LogSegmentMetadata segment, + OpListener<Void> opListener); /** * Update a log segment <code>segment</code> under transaction <code>txn</code>. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java index baa3240..ac36ef2 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java @@ -51,16 +51,15 @@ public class BKDLConfig implements DLConfig { new ConcurrentHashMap<URI, DLConfig>(); public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) { - dlConf.setSanityCheckTxnID(bkdlConfig.getSanityCheckTxnID()); dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID()); dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo()); if (bkdlConfig.isFederatedNamespace()) { dlConf.setCreateStreamIfNotExists(false); LOG.info("Disabled createIfNotExists for federated namespace."); } - LOG.info("Propagate BKDLConfig to DLConfig : sanityCheckTxnID = {}, encodeRegionID = {}," + + LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," + " firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.", - new Object[] { dlConf.getSanityCheckTxnID(), dlConf.getEncodeRegionIDInLogSegmentMetadata(), + new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(), dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(), bkdlConfig.isFederatedNamespace() }); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java index d205b3a..0e5e6d4 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java @@ -177,8 +177,8 @@ public class LogSegmentMetadataStoreUpdater implements MetadataUpdater { protected void addNewSegmentAndDeleteOldSegment(Transaction<Object> txn, LogSegmentMetadata newSegment, LogSegmentMetadata oldSegment) { - metadataStore.deleteLogSegment(txn, oldSegment); - metadataStore.createLogSegment(txn, newSegment); + metadataStore.deleteLogSegment(txn, oldSegment, null); + metadataStore.createLogSegment(txn, newSegment, null); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java new file mode 100644 index 0000000..db7812e --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.metadata; + +import com.google.common.annotations.Beta; +import com.google.common.base.Optional; +import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; +import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; +import com.twitter.distributedlog.lock.DistributedLock; +import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; +import com.twitter.distributedlog.util.PermitManager; +import com.twitter.distributedlog.util.Transaction; +import com.twitter.util.Future; + +import java.io.Closeable; +import java.net.URI; + +/** + * The interface to manage the log stream metadata. The implementation is responsible + * for creating the metadata layout. + */ +@Beta +public interface LogStreamMetadataStore extends Closeable { + + /** + * Create a transaction for the metadata operations happening in the metadata store. + * + * @return transaction for the metadata operations + */ + Transaction<Object> newTransaction(); + + /** + * Ensure the existence of a log stream + * + * @param uri the location of the log stream + * @param logName the name of the log stream + * @return future represents the existence of a log stream. {@link com.twitter.distributedlog.LogNotFoundException} + * is thrown if the log doesn't exist + */ + Future<Void> logExists(URI uri, String logName); + + /** + * Create the read lock for the log stream. + * + * @param metadata the metadata for a log stream + * @param readerId the reader id used for lock + * @return the read lock + */ + Future<DistributedLock> createReadLock(ZKLogMetadataForReader metadata, + Optional<String> readerId); + + /** + * Create the write lock for the log stream. + * + * @param metadata the metadata for a log stream + * @return the write lock + */ + DistributedLock createWriteLock(ZKLogMetadataForWriter metadata); + + /** + * Create the metadata of a log. + * + * @param uri the location to store the metadata of the log + * @param streamName the name of the log stream + * @param ownAllocator whether to use its own allocator or external allocator + * @param createIfNotExists flag to create the stream if it doesn't exist + * @return the metadata of the log + */ + Future<ZKLogMetadataForWriter> getLog(URI uri, + String streamName, + boolean ownAllocator, + boolean createIfNotExists); + + /** + * Delete the metadata of a log. + * + * @param uri the location to store the metadata of the log + * @param streamName the name of the log stream + * @return future represents the result of the deletion. + */ + Future<Void> deleteLog(URI uri, String streamName); + + /** + * Get the log segment metadata store. + * + * @return the log segment metadata store. + */ + LogSegmentMetadataStore getLogSegmentMetadataStore(); + + /** + * Get the permit manager for this metadata store. It can be used for limiting the concurrent + * metadata operations. The implementation can disable handing over the permits when the metadata + * store is unavailable (for example zookeeper session expired). + * + * @return the permit manager + */ + PermitManager getPermitManager(); + + + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java index bed2fcd..23d8e40 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java @@ -267,7 +267,8 @@ public class DistributedLogTool extends Tool { protected LogSegmentMetadataStore getLogSegmentMetadataStore() throws IOException { DistributedLogNamespace namespace = getFactory().getNamespace(); assert(namespace instanceof BKDistributedLogNamespace); - return ((BKDistributedLogNamespace) namespace).getWriterSegmentMetadataStore(); + return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore() + .getLogSegmentMetadataStore(); } protected ZooKeeperClient getZooKeeperClient() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java deleted file mode 100644 index dc25023..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Manager to control all the log segments rolling. - */ -public class LimitedPermitManager implements PermitManager, Runnable, Watcher { - - static final Logger LOG = LoggerFactory.getLogger(LimitedPermitManager.class); - - static enum PermitState { - ALLOWED, DISALLOWED, DISABLED - } - - class EpochPermit implements Permit { - - final PermitState state; - final int epoch; - - EpochPermit(PermitState state) { - this.state = state; - this.epoch = LimitedPermitManager.this.epoch.get(); - } - - int getEpoch() { - return epoch; - } - - @Override - public boolean isAllowed() { - return PermitState.ALLOWED == state; - } - } - - boolean enablePermits = true; - final Semaphore semaphore; - final int period; - final TimeUnit timeUnit; - final ScheduledExecutorService executorService; - final AtomicInteger epoch = new AtomicInteger(0); - private StatsLogger statsLogger = null; - private Gauge<Number> outstandingGauge = null; - - public LimitedPermitManager(int concurrency, int period, TimeUnit timeUnit, - ScheduledExecutorService executorService) { - this(concurrency, period, timeUnit, executorService, NullStatsLogger.INSTANCE); - } - - public LimitedPermitManager(final int concurrency, int period, TimeUnit timeUnit, - ScheduledExecutorService executorService, StatsLogger statsLogger) { - if (concurrency > 0) { - this.semaphore = new Semaphore(concurrency); - } else { - this.semaphore = null; - } - this.period = period; - this.timeUnit = timeUnit; - this.executorService = executorService; - this.statsLogger = statsLogger; - this.outstandingGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return null == semaphore ? 0 : concurrency - semaphore.availablePermits(); - } - }; - this.statsLogger.scope("permits").registerGauge("outstanding", this.outstandingGauge); - } - - @Override - synchronized public Permit acquirePermit() { - if (!enablePermits) { - return new EpochPermit(PermitState.DISABLED); - } - if (null != semaphore) { - return semaphore.tryAcquire() ? new EpochPermit(PermitState.ALLOWED) : - new EpochPermit(PermitState.DISALLOWED); - } else { - return new EpochPermit(PermitState.ALLOWED); - } - } - - @Override - synchronized public void releasePermit(Permit permit) { - if (null != semaphore && permit.isAllowed()) { - if (period <= 0) { - semaphore.release(); - } else { - try { - executorService.schedule(this, period, timeUnit); - } catch (RejectedExecutionException ree) { - LOG.warn("Failed on scheduling releasing permit in given period ({}ms)." + - " Release it immediately : ", timeUnit.toMillis(period), ree); - semaphore.release(); - } - } - } - } - - @Override - synchronized public boolean disallowObtainPermits(Permit permit) { - if (!(permit instanceof EpochPermit)) { - return false; - } - if (epoch.getAndIncrement() == ((EpochPermit)permit).getEpoch()) { - this.enablePermits = false; - LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get()); - return true; - } else { - return false; - } - } - - @Override - public void close() { - unregisterGauge(); - } - - @Override - synchronized public boolean allowObtainPermits() { - forceSetAllowPermits(true); - return true; - } - - synchronized void forceSetAllowPermits(boolean allowPermits) { - epoch.getAndIncrement(); - this.enablePermits = allowPermits; - LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get()); - } - - @Override - public void run() { - semaphore.release(); - } - - @Override - public void process(WatchedEvent event) { - if (event.getType().equals(Event.EventType.None)) { - switch (event.getState()) { - case SyncConnected: - forceSetAllowPermits(true); - break; - case Disconnected: - forceSetAllowPermits(false); - break; - case Expired: - forceSetAllowPermits(false); - break; - default: - break; - } - } - } - - public void unregisterGauge() { - if(this.statsLogger != null && this.outstandingGauge != null) { - this.statsLogger.scope("permits").unregisterGauge("outstanding", this.outstandingGauge); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java index 7d76f29..78292e9 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java @@ -17,29 +17,39 @@ */ package com.twitter.distributedlog.zk; +import com.twitter.distributedlog.util.Transaction.OpListener; import org.apache.zookeeper.Op; import org.apache.zookeeper.OpResult; +import javax.annotation.Nullable; + /** * Default zookeeper operation. No action on commiting or aborting. */ public class DefaultZKOp extends ZKOp { - public static DefaultZKOp of(Op op) { - return new DefaultZKOp(op); + public static DefaultZKOp of(Op op, OpListener<Void> listener) { + return new DefaultZKOp(op, listener); } - private DefaultZKOp(Op op) { + private final OpListener<Void> listener; + + private DefaultZKOp(Op op, @Nullable OpListener<Void> opListener) { super(op); + this.listener = opListener; } @Override protected void commitOpResult(OpResult opResult) { - // no-op + if (null != listener) { + listener.onCommit(null); + } } @Override protected void abortOpResult(Throwable t, OpResult opResult) { - // no-op + if (null != listener) { + listener.onAbort(t); + } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java new file mode 100644 index 0000000..78ff0a2 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.zk; + +import com.twitter.distributedlog.util.PermitManager; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Manager to control all the log segments rolling. + */ +public class LimitedPermitManager implements PermitManager, Runnable, Watcher { + + static final Logger LOG = LoggerFactory.getLogger(LimitedPermitManager.class); + + static enum PermitState { + ALLOWED, DISALLOWED, DISABLED + } + + class EpochPermit implements Permit { + + final PermitState state; + final int epoch; + + EpochPermit(PermitState state) { + this.state = state; + this.epoch = LimitedPermitManager.this.epoch.get(); + } + + int getEpoch() { + return epoch; + } + + @Override + public boolean isAllowed() { + return PermitState.ALLOWED == state; + } + } + + boolean enablePermits = true; + final Semaphore semaphore; + final int period; + final TimeUnit timeUnit; + final ScheduledExecutorService executorService; + final AtomicInteger epoch = new AtomicInteger(0); + private StatsLogger statsLogger = null; + private Gauge<Number> outstandingGauge = null; + + public LimitedPermitManager(int concurrency, int period, TimeUnit timeUnit, + ScheduledExecutorService executorService) { + this(concurrency, period, timeUnit, executorService, NullStatsLogger.INSTANCE); + } + + public LimitedPermitManager(final int concurrency, int period, TimeUnit timeUnit, + ScheduledExecutorService executorService, StatsLogger statsLogger) { + if (concurrency > 0) { + this.semaphore = new Semaphore(concurrency); + } else { + this.semaphore = null; + } + this.period = period; + this.timeUnit = timeUnit; + this.executorService = executorService; + this.statsLogger = statsLogger; + this.outstandingGauge = new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return null == semaphore ? 0 : concurrency - semaphore.availablePermits(); + } + }; + this.statsLogger.scope("permits").registerGauge("outstanding", this.outstandingGauge); + } + + @Override + synchronized public Permit acquirePermit() { + if (!enablePermits) { + return new EpochPermit(PermitState.DISABLED); + } + if (null != semaphore) { + return semaphore.tryAcquire() ? new EpochPermit(PermitState.ALLOWED) : + new EpochPermit(PermitState.DISALLOWED); + } else { + return new EpochPermit(PermitState.ALLOWED); + } + } + + @Override + synchronized public void releasePermit(Permit permit) { + if (null != semaphore && permit.isAllowed()) { + if (period <= 0) { + semaphore.release(); + } else { + try { + executorService.schedule(this, period, timeUnit); + } catch (RejectedExecutionException ree) { + LOG.warn("Failed on scheduling releasing permit in given period ({}ms)." + + " Release it immediately : ", timeUnit.toMillis(period), ree); + semaphore.release(); + } + } + } + } + + @Override + synchronized public boolean disallowObtainPermits(Permit permit) { + if (!(permit instanceof EpochPermit)) { + return false; + } + if (epoch.getAndIncrement() == ((EpochPermit)permit).getEpoch()) { + this.enablePermits = false; + LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get()); + return true; + } else { + return false; + } + } + + @Override + public void close() { + unregisterGauge(); + } + + @Override + synchronized public boolean allowObtainPermits() { + forceSetAllowPermits(true); + return true; + } + + synchronized void forceSetAllowPermits(boolean allowPermits) { + epoch.getAndIncrement(); + this.enablePermits = allowPermits; + LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get()); + } + + @Override + public void run() { + semaphore.release(); + } + + @Override + public void process(WatchedEvent event) { + if (event.getType().equals(Event.EventType.None)) { + switch (event.getState()) { + case SyncConnected: + forceSetAllowPermits(true); + break; + case Disconnected: + forceSetAllowPermits(false); + break; + case Expired: + forceSetAllowPermits(false); + break; + default: + break; + } + } + } + + public void unregisterGauge() { + if(this.statsLogger != null && this.outstandingGauge != null) { + this.statsLogger.scope("permits").unregisterGauge("outstanding", this.outstandingGauge); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java index d885593..5b788e2 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java @@ -33,7 +33,8 @@ public class ZKVersionedSetOp extends ZKOp { private final OpListener<Version> listener; - public ZKVersionedSetOp(Op op, OpListener<Version> opListener) { + public ZKVersionedSetOp(Op op, + @Nullable OpListener<Version> opListener) { super(op); this.listener = opListener; } @@ -42,7 +43,9 @@ public class ZKVersionedSetOp extends ZKOp { protected void commitOpResult(OpResult opResult) { assert(opResult instanceof OpResult.SetDataResult); OpResult.SetDataResult setDataResult = (OpResult.SetDataResult) opResult; - listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion())); + if (null != listener) { + listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion())); + } } @Override @@ -60,7 +63,9 @@ public class ZKVersionedSetOp extends ZKOp { cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr())); } } - listener.onAbort(cause); + if (null != listener) { + listener.onAbort(cause); + } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java index c588cd7..1485ae6 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java @@ -36,6 +36,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.feature.SettableFeatureProvider; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.versioning.Version; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -429,7 +430,7 @@ public class DLMTestUtil { .setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion)) .build(); l.write(dlm.writerZKC); - writeHandler.maxTxId.store(startTxID); + writeHandler.maxTxId.update(Version.ANY, startTxID); writeHandler.addLogSegmentToCache(inprogressZnodeName, l); BKLogSegmentWriter writer = new BKLogSegmentWriter( writeHandler.getFullyQualifiedName(), @@ -479,7 +480,7 @@ public class DLMTestUtil { .setInprogress(false) .build(); l.write(dlm.writerZKC); - writeHandler.maxTxId.store(startTxID); + writeHandler.maxTxId.update(Version.ANY, startTxID); writeHandler.addLogSegmentToCache(inprogressZnodeName, l); BKLogSegmentWriter writer = new BKLogSegmentWriter( writeHandler.getFullyQualifiedName(), http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java index d45a727..6aa38c3 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java @@ -21,6 +21,7 @@ import java.io.ByteArrayInputStream; import java.net.URI; import com.twitter.distributedlog.exceptions.BKTransmitException; +import com.twitter.distributedlog.util.FutureUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -203,7 +204,7 @@ public class TestAppendOnlyStreamWriter extends TestDistributedLogBase { BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name); URI uri = createDLMURI("/" + name); - BKDistributedLogManager.createLog(conf, dlm.getReaderZKC(), uri, name); + FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, true, true)); // Log exists but is empty, better not throw. AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter(); @@ -264,7 +265,7 @@ public class TestAppendOnlyStreamWriter extends TestDistributedLogBase { BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name); URI uri = createDLMURI("/" + name); - BKDistributedLogManager.createLog(conf, dlm.getReaderZKC(), uri, name); + FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, true, true)); // Log exists but is empty, better not throw. AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java index 6ad9950..c8a1c74 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java @@ -148,61 +148,6 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { } @Test(timeout = 60000) - public void testSanityCheckTxnID() throws Exception { - String name = "distrlog-sanity-check-txnid"; - BKDistributedLogManager dlm = createNewDLM(conf, name); - BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned(); - long txid = 1; - for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { - LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - out.write(op); - } - out.closeAndComplete(); - - BKSyncLogWriter out1 = dlm.startLogSegmentNonPartitioned(); - LogRecord op1 = DLMTestUtil.getLogRecordInstance(1); - try { - out1.write(op1); - fail("Should fail writing lower txn id if sanityCheckTxnID is enabled."); - } catch (TransactionIdOutOfOrderException tioooe) { - // expected - } - out1.closeAndComplete(); - dlm.close(); - - DLMTestUtil.updateBKDLConfig(bkutil.getUri(), bkutil.getZkServers(), bkutil.getBkLedgerPath(), false); - LOG.info("Disable sanity check txn id."); - BKDLConfig.clearCachedDLConfigs(); - - DistributedLogConfiguration newConf = new DistributedLogConfiguration(); - newConf.addConfiguration(conf); - BKDistributedLogManager newDLM = createNewDLM(newConf, name); - BKSyncLogWriter out2 = newDLM.startLogSegmentNonPartitioned(); - LogRecord op2 = DLMTestUtil.getLogRecordInstance(1); - out2.write(op2); - out2.closeAndComplete(); - newDLM.close(); - - DLMTestUtil.updateBKDLConfig(bkutil.getUri(), bkutil.getZkServers(), bkutil.getBkLedgerPath(), true); - LOG.info("Enable sanity check txn id."); - BKDLConfig.clearCachedDLConfigs(); - - DistributedLogConfiguration conf3 = new DistributedLogConfiguration(); - conf3.addConfiguration(conf); - BKDistributedLogManager dlm3 = createNewDLM(newConf, name); - BKSyncLogWriter out3 = dlm3.startLogSegmentNonPartitioned(); - LogRecord op3 = DLMTestUtil.getLogRecordInstance(1); - try { - out3.write(op3); - fail("Should fail writing lower txn id if sanityCheckTxnID is enabled."); - } catch (TransactionIdOutOfOrderException tioooe) { - // expected - } - out3.closeAndComplete(); - dlm3.close(); - } - - @Test(timeout = 60000) public void testContinuousReaders() throws Exception { String name = "distrlog-continuous"; BKDistributedLogManager dlm = createNewDLM(conf, name); @@ -958,12 +903,9 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { final AtomicReference<Collection<LogSegmentMetadata>> receivedStreams = new AtomicReference<Collection<LogSegmentMetadata>>(); - DistributedLogManager dlm = createNewDLM(conf, name); - ZooKeeperClient zkClient = TestZooKeeperClientBuilder.newBuilder() - .uri(createDLMURI("/")) - .build(); + BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name); - BKDistributedLogManager.createLog(conf, zkClient, ((BKDistributedLogManager) dlm).uri, name); + FutureUtils.result(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true)); dlm.registerListener(new LogSegmentListener() { @Override public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { @@ -992,7 +934,6 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { // no-op } }); - LOG.info("Registered listener for stream {}.", name); long txid = 1; for (int i = 0; i < numSegments; i++) { LOG.info("Waiting for creating log segment {}.", i); @@ -1018,6 +959,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { assertEquals(seqno * DEFAULT_SEGMENT_SIZE, m.getLastTxId()); ++seqno; } + + dlm.close(); } @Test(timeout = 60000) http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java index e68b916..ecc20e0 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java @@ -102,7 +102,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { dlm.close(); // create the stream - BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, streamName); + namespace.createLog(streamName); DistributedLogManager newDLM = namespace.openLog(streamName); LogWriter newWriter = newDLM.startLogSegmentNonPartitioned(); @@ -273,9 +273,9 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { } }); latches[0].await(); - BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, "test1"); + namespace.createLog("test1"); latches[1].await(); - BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, "test2"); + namespace.createLog("test2"); latches[2].await(); assertEquals(0, numFailures.get()); assertNotNull(receivedStreams.get()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java index 604be0e..8a734b5 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java @@ -191,7 +191,8 @@ public class TestDistributedLogBase { protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogManagerFactory factory) { DistributedLogNamespace namespace = factory.getNamespace(); assertTrue(namespace instanceof BKDistributedLogNamespace); - return ((BKDistributedLogNamespace) namespace).getWriterSegmentMetadataStore(); + return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore() + .getLogSegmentMetadataStore(); } @SuppressWarnings("deprecation") http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java index 54b1ab8..027b012 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java @@ -253,6 +253,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { confLocal.setOutputBufferSize(0); confLocal.setLogSegmentRollingIntervalMinutes(0); confLocal.setMaxLogSegmentBytes(1); + confLocal.setLogSegmentRollingConcurrency(Integer.MAX_VALUE); int numLogSegments = 10; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java index 625742e..66b97be 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java @@ -140,7 +140,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase { logger.info("Try obtaining ledger handle {}", lh.getId()); byte[] data = zkc.get().getData(allocationPath, false, null); assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8))); - txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1))); + txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null)); try { FutureUtils.result(txn.execute()); fail("Should fail the transaction when setting unexisted path"); @@ -337,7 +337,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase { ZKTransaction txn = newTxn(); // close during obtaining ledger. LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER)); - txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1))); + txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null)); try { FutureUtils.result(txn.execute()); fail("Should fail the transaction when setting unexisted path");