http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java index 5d3be7d..f2e30ce 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java @@ -23,22 +23,21 @@ import com.google.common.collect.Lists; import com.twitter.distributedlog.bk.LedgerAllocator; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.DLIllegalStateException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.EndOfStreamException; import com.twitter.distributedlog.exceptions.LockingException; +import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException; import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.exceptions.ZKException; import com.twitter.distributedlog.function.GetLastTxIdFunction; import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter; import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.logsegment.RollingPolicy; import com.twitter.distributedlog.logsegment.SizeBasedRollingPolicy; import com.twitter.distributedlog.logsegment.TimeBasedRollingPolicy; +import com.twitter.distributedlog.metadata.LogStreamMetadataStore; import com.twitter.distributedlog.metadata.MetadataUpdater; import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; import com.twitter.distributedlog.util.DLUtils; @@ -49,9 +48,6 @@ import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.Transaction; import com.twitter.distributedlog.util.PermitLimiter; import com.twitter.distributedlog.util.Utils; -import com.twitter.distributedlog.zk.ZKOp; -import com.twitter.distributedlog.zk.ZKTransaction; -import com.twitter.distributedlog.zk.ZKVersionedSetOp; import com.twitter.util.Function; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; @@ -60,19 +56,11 @@ import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZKUtil; -import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.runtime.AbstractFunction1; @@ -84,7 +72,6 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; -import static com.google.common.base.Charsets.UTF_8; import static com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER; /** @@ -102,11 +89,11 @@ import static com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_F class BKLogWriteHandler extends BKLogHandler { static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); + protected final ZKLogMetadataForWriter logMetadataForWriter; protected final DistributedLock lock; protected final LedgerAllocator ledgerAllocator; protected final MaxTxId maxTxId; protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo; - protected final boolean sanityCheckTxnId; protected final boolean validateLogSegmentSequenceNumber; protected final int regionId; protected final RollingPolicy rollingPolicy; @@ -164,9 +151,8 @@ class BKLogWriteHandler extends BKLogHandler { */ BKLogWriteHandler(ZKLogMetadataForWriter logMetadata, DistributedLogConfiguration conf, - ZooKeeperClientBuilder zkcBuilder, BookKeeperClientBuilder bkcBuilder, - LogSegmentMetadataStore metadataStore, + LogStreamMetadataStore streamMetadataStore, LogSegmentMetadataCache metadataCache, OrderedScheduler scheduler, LedgerAllocator allocator, @@ -181,14 +167,14 @@ class BKLogWriteHandler extends BKLogHandler { DistributedLock lock /** owned by handler **/) { super(logMetadata, conf, - zkcBuilder, bkcBuilder, - metadataStore, + streamMetadataStore, metadataCache, scheduler, statsLogger, alertStatsLogger, clientId); + this.logMetadataForWriter = logMetadata; this.perLogStatsLogger = perLogStatsLogger; this.writeLimiter = writeLimiter; this.featureProvider = featureProvider; @@ -202,15 +188,13 @@ class BKLogWriteHandler extends BKLogHandler { } else { this.regionId = DistributedLogConstants.LOCAL_REGION_ID; } - this.sanityCheckTxnId = conf.getSanityCheckTxnID(); this.validateLogSegmentSequenceNumber = conf.isLogSegmentSequenceNumberValidationEnabled(); // Construct the max sequence no maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(logMetadata.getMaxLSSNData()); inprogressLSSNs = new LinkedList<Long>(); // Construct the max txn id. - maxTxId = new MaxTxId(zooKeeperClient, logMetadata.getMaxTxIdPath(), - conf.getSanityCheckTxnID(), logMetadata.getMaxTxIdData()); + maxTxId = new MaxTxId(logMetadata.getMaxTxIdData()); // Schedule fetching log segment list in background before we access it. // We don't need to watch the log segment list changes for writer, as it manages log segment list. @@ -291,13 +275,12 @@ class BKLogWriteHandler extends BKLogHandler { } // Transactional operations for MaxLogSegmentSequenceNo - void storeMaxSequenceNumber(final Transaction txn, + void storeMaxSequenceNumber(final Transaction<Object> txn, final MaxLogSegmentSequenceNo maxSeqNo, final long seqNo, final boolean isInprogress) { - byte[] data = DLUtils.serializeLogSegmentSequenceNumber(seqNo); - Op zkOp = Op.setData(logMetadata.getLogSegmentsPath(), data, maxSeqNo.getZkVersion()); - txn.addOp(new ZKVersionedSetOp(zkOp, new Transaction.OpListener<Version>() { + metadataStore.storeMaxLogSegmentSequenceNumber(txn, logMetadata, maxSeqNo.getVersionedData(seqNo), + new Transaction.OpListener<Version>() { @Override public void onCommit(Version version) { if (validateLogSegmentSequenceNumber) { @@ -309,69 +292,60 @@ class BKLogWriteHandler extends BKLogHandler { } } } - maxSeqNo.update((ZkVersion) version, seqNo); + maxSeqNo.update(version, seqNo); } @Override public void onAbort(Throwable t) { // no-op } - })); + }); } // Transactional operations for MaxTxId - void storeMaxTxId(final ZKTransaction txn, + void storeMaxTxId(final Transaction<Object> txn, final MaxTxId maxTxId, final long txId) { - byte[] data = maxTxId.couldStore(txId); - if (null != data) { - Op zkOp = Op.setData(maxTxId.getZkPath(), data, -1); - txn.addOp(new ZKVersionedSetOp(zkOp, new Transaction.OpListener<Version>() { - @Override - public void onCommit(Version version) { - maxTxId.setMaxTxId(txId); - } - - @Override - public void onAbort(Throwable t) { + metadataStore.storeMaxTxnId(txn, logMetadataForWriter, maxTxId.getVersionedData(txId), + new Transaction.OpListener<Version>() { + @Override + public void onCommit(Version version) { + maxTxId.update(version, txId); + } - } - })); - } + @Override + public void onAbort(Throwable t) { + // no-op + } + }); } // Transactional operations for logsegment - void writeLogSegment(final ZKTransaction txn, - final List<ACL> acl, - final String inprogressSegmentName, - final LogSegmentMetadata metadata, - final String path) { - byte[] finalisedData = metadata.getFinalisedData().getBytes(UTF_8); - Op zkOp = Op.create(path, finalisedData, acl, CreateMode.PERSISTENT); - txn.addOp(new ZKOp(zkOp) { + void writeLogSegment(final Transaction<Object> txn, + final LogSegmentMetadata metadata) { + metadataStore.createLogSegment(txn, metadata, new Transaction.OpListener<Void>() { @Override - protected void commitOpResult(OpResult opResult) { - addLogSegmentToCache(inprogressSegmentName, metadata); + public void onCommit(Void r) { + addLogSegmentToCache(metadata.getSegmentName(), metadata); } @Override - protected void abortOpResult(Throwable t, OpResult opResult) { + public void onAbort(Throwable t) { // no-op } }); } - void deleteLogSegment(final ZKTransaction txn, - final String logSegmentName, - final String logSegmentPath) { - Op zkOp = Op.delete(logSegmentPath, -1); - txn.addOp(new ZKOp(zkOp) { + void deleteLogSegment(final Transaction<Object> txn, + final LogSegmentMetadata metadata) { + metadataStore.deleteLogSegment(txn, metadata, new Transaction.OpListener<Void>() { @Override - protected void commitOpResult(OpResult opResult) { - removeLogSegmentFromCache(logSegmentName); + public void onCommit(Void r) { + removeLogSegmentFromCache(metadata.getSegmentName()); } + @Override - protected void abortOpResult(Throwable t, OpResult opResult) { + public void onAbort(Throwable t) { // no-op } }); @@ -405,10 +379,6 @@ class BKLogWriteHandler extends BKLogHandler { } } - void register(Watcher watcher) { - this.zooKeeperClient.register(watcher); - } - /** * Start a new log segment in a BookKeeper ledger. * First ensure that we have the write lock for this journal. @@ -539,19 +509,17 @@ class BKLogWriteHandler extends BKLogHandler { FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId)); return; } - if (this.sanityCheckTxnId) { - long highestTxIdWritten = maxTxId.get(); - if (txId < highestTxIdWritten) { - if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) { - LOG.error("We've already marked the stream as ended and attempting to start a new log segment"); - FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed")); - return; - } - else { - LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten); - FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten)); - return; - } + + long highestTxIdWritten = maxTxId.get(); + if (txId < highestTxIdWritten) { + if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) { + LOG.error("We've already marked the stream as ended and attempting to start a new log segment"); + FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed")); + return; + } else { + LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten); + FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten)); + return; } } @@ -564,7 +532,7 @@ class BKLogWriteHandler extends BKLogHandler { } // start the transaction from zookeeper - final ZKTransaction txn = new ZKTransaction(zooKeeperClient); + final Transaction<Object> txn = streamMetadataStore.newTransaction(); // failpoint injected before creating ledger try { @@ -617,7 +585,7 @@ class BKLogWriteHandler extends BKLogHandler { // once the ledger handle is obtained from allocator, this function should guarantee // either the transaction is executed or aborted. Otherwise, the ledger handle will // just leak from the allocation pool - hence cause "No Ledger Allocator" - private void createInprogressLogSegment(ZKTransaction txn, + private void createInprogressLogSegment(Transaction<Object> txn, final long txId, final LedgerHandle lh, boolean bestEffort, @@ -634,7 +602,6 @@ class BKLogWriteHandler extends BKLogHandler { return; } - final String inprogressZnodeName = inprogressZNodeName(lh.getId(), txId, logSegmentSeqNo); final String inprogressZnodePath = inprogressZNode(lh.getId(), txId, logSegmentSeqNo); final LogSegmentMetadata l = new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath, @@ -645,12 +612,7 @@ class BKLogWriteHandler extends BKLogHandler { .build(); // Create an inprogress segment - writeLogSegment( - txn, - zooKeeperClient.getDefaultACL(), - inprogressZnodeName, - l, - inprogressZnodePath); + writeLogSegment(txn, l); // Try storing max sequence number. LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZnodePath, logSegmentSeqNo); @@ -667,7 +629,7 @@ class BKLogWriteHandler extends BKLogHandler { try { FutureUtils.setValue(promise, new BKLogSegmentWriter( getFullyQualifiedName(), - inprogressZnodeName, + l.getSegmentName(), conf, conf.getDLLedgerMetadataLayoutVersion(), new BKLogSegmentEntryWriter(lh), @@ -888,7 +850,6 @@ class BKLogWriteHandler extends BKLogHandler { } LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId); - final String inprogressZnodePath = inprogressZNode(inprogressZnodeName); LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName); // validate log segment @@ -936,7 +897,7 @@ class BKLogWriteHandler extends BKLogHandler { // ignore the case that a new inprogress log segment is pre-allocated // before completing current inprogress one LOG.info("Try storing max sequence number {} in completing {}.", - new Object[] { logSegmentSeqNo, inprogressZnodePath }); + new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() }); } else { LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}", new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() }); @@ -949,7 +910,6 @@ class BKLogWriteHandler extends BKLogHandler { } // Prepare the completion - final String nameForCompletedLedger = completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo); final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo); long startSequenceId; try { @@ -970,17 +930,12 @@ class BKLogWriteHandler extends BKLogHandler { setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime()); // prepare the transaction - ZKTransaction txn = new ZKTransaction(zooKeeperClient); + Transaction<Object> txn = streamMetadataStore.newTransaction(); // create completed log segment - writeLogSegment( - txn, - zooKeeperClient.getDefaultACL(), - nameForCompletedLedger, - completedLogSegment, - pathForCompletedLedger); + writeLogSegment(txn, completedLogSegment); // delete inprogress log segment - deleteLogSegment(txn, inprogressZnodeName, inprogressZnodePath); + deleteLogSegment(txn, inprogressLogSegment); // store max sequence number storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false); // update max txn id. @@ -991,7 +946,8 @@ class BKLogWriteHandler extends BKLogHandler { @Override public void onSuccess(Void value) { LOG.info("Completed {} to {} for {} : {}", - new Object[] { inprogressZnodeName, nameForCompletedLedger, getFullyQualifiedName(), completedLogSegment }); + new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(), + getFullyQualifiedName(), completedLogSegment }); FutureUtils.setValue(promise, completedLogSegment); } @@ -1072,27 +1028,6 @@ class BKLogWriteHandler extends BKLogHandler { } - public void deleteLog() throws IOException { - lock.checkOwnershipAndReacquire(); - FutureUtils.result(purgeLogSegmentsOlderThanTxnId(-1)); - - try { - Utils.closeQuietly(lock); - zooKeeperClient.get().exists(logMetadata.getLogSegmentsPath(), false); - zooKeeperClient.get().exists(logMetadata.getMaxTxIdPath(), false); - if (logMetadata.getLogRootPath().toLowerCase().contains("distributedlog")) { - ZKUtil.deleteRecursive(zooKeeperClient.get(), logMetadata.getLogRootPath()); - } else { - LOG.warn("Skip deletion of unrecognized ZK Path {}", logMetadata.getLogRootPath()); - } - } catch (InterruptedException ie) { - LOG.error("Interrupted while deleting log znodes", ie); - throw new DLInterruptedException("Interrupted while deleting " + logMetadata.getLogRootPath(), ie); - } catch (KeeperException ke) { - LOG.error("Error deleting" + logMetadata.getLogRootPath() + " in zookeeper", ke); - } - } - Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) { if (DLSN.InvalidDLSN == dlsn) { List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0); @@ -1321,33 +1256,29 @@ class BKLogWriteHandler extends BKLogHandler { private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata, final Promise<LogSegmentMetadata> promise) { Transaction<Object> deleteTxn = metadataStore.transaction(); - metadataStore.deleteLogSegment(deleteTxn, segmentMetadata); - deleteTxn.execute().addEventListener(new FutureEventListener<Void>() { + metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() { @Override - public void onSuccess(Void result) { + public void onCommit(Void r) { // purge log segment removeLogSegmentFromCache(segmentMetadata.getZNodeName()); promise.setValue(segmentMetadata); } @Override - public void onFailure(Throwable cause) { - if (cause instanceof ZKException) { - ZKException zke = (ZKException) cause; - if (KeeperException.Code.NONODE == zke.getKeeperExceptionCode()) { - LOG.error("No log segment {} found for {}.", - segmentMetadata, getFullyQualifiedName()); - // purge log segment - removeLogSegmentFromCache(segmentMetadata.getZNodeName()); - promise.setValue(segmentMetadata); - return; - } + public void onAbort(Throwable t) { + if (t instanceof LogSegmentNotFoundException) { + // purge log segment + removeLogSegmentFromCache(segmentMetadata.getZNodeName()); + promise.setValue(segmentMetadata); + return; + } else { + LOG.error("Couldn't purge {} for {}: with error {}", + new Object[]{ segmentMetadata, getFullyQualifiedName(), t }); + promise.setException(t); } - LOG.error("Couldn't purge {} for {}: with error {}", - new Object[]{ segmentMetadata, getFullyQualifiedName(), cause }); - promise.setException(cause); } }); + deleteTxn.execute(); } @Override
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java index f4ca45e..0f6db75 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java @@ -62,7 +62,6 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification { StatsLogger statsLogger) { this.readHandler = bkdlm.createReadHandler( Optional.<String>absent(), - bkdlm.getLockStateExecutor(true), this, conf.getDeserializeRecordSetOnReads(), true); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java index 46a056b..6f37a59 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java @@ -1557,6 +1557,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration { * * @return true if should check txn id with max txn id, otherwise false. */ + @Deprecated public boolean getSanityCheckTxnID() { return getBoolean(BKDL_MAXID_SANITYCHECK, BKDL_MAXID_SANITYCHECK_DEFAULT); } @@ -1569,6 +1570,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration { * @return configuration. * @see #getSanityCheckTxnID() */ + @Deprecated public DistributedLogConfiguration setSanityCheckTxnID(boolean enabled) { setProperty(BKDL_MAXID_SANITYCHECK, enabled); return this; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java index 80cf350..9bfaaba 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java @@ -17,26 +17,15 @@ */ package com.twitter.distributedlog; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.ZKException; import com.twitter.distributedlog.util.DLUtils; -import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; /** * Utility class for storing and reading max ledger sequence number */ class MaxLogSegmentSequenceNo { - static final Logger LOG = LoggerFactory.getLogger(MaxLogSegmentSequenceNo.class); - Version version; long maxSeqNo; @@ -55,24 +44,20 @@ class MaxLogSegmentSequenceNo { if (null != logSegmentsData && null != logSegmentsData.getVersion()) { version = logSegmentsData.getVersion(); } else { - version = new ZkVersion(-1); + throw new IllegalStateException("Invalid MaxLogSegmentSequenceNo found - " + logSegmentsData); } } } - synchronized int getZkVersion() { - return ((ZkVersion) version).getZnodeVersion(); + synchronized Version getVersion() { + return version; } synchronized long getSequenceNumber() { return maxSeqNo; } - synchronized MaxLogSegmentSequenceNo update(int zkVersion, long logSegmentSeqNo) { - return update(new ZkVersion(zkVersion), logSegmentSeqNo); - } - - synchronized MaxLogSegmentSequenceNo update(ZkVersion version, long logSegmentSeqNo) { + synchronized MaxLogSegmentSequenceNo update(Version version, long logSegmentSeqNo) { if (version.compare(this.version) == Version.Occurred.AFTER) { this.version = version; this.maxSeqNo = logSegmentSeqNo; @@ -80,21 +65,8 @@ class MaxLogSegmentSequenceNo { return this; } - synchronized void store(ZooKeeperClient zkc, String path, long logSegmentSeqNo) throws IOException { - try { - Stat stat = zkc.get().setData(path, - DLUtils.serializeLogSegmentSequenceNumber(logSegmentSeqNo), getZkVersion()); - update(stat.getVersion(), logSegmentSeqNo); - } catch (KeeperException ke) { - throw new ZKException("Error writing max ledger sequence number " + logSegmentSeqNo + " to " - + path + " : ", ke); - } catch (ZooKeeperClient.ZooKeeperConnectionException zce) { - throw new IOException("Error writing max ledger sequence number " + logSegmentSeqNo + " to " - + path + " : ", zce); - } catch (InterruptedException e) { - throw new DLInterruptedException("Error writing max ledger sequence number " + logSegmentSeqNo + " to " - + path + " : ", e); - } + public synchronized Versioned<Long> getVersionedData(long seqNo) { + return new Versioned<Long>(seqNo, version); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java index c446a8b..ed7218e 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java @@ -18,13 +18,11 @@ package com.twitter.distributedlog; import com.twitter.distributedlog.util.DLUtils; +import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** * Utility class for storing and reading * the max seen txid in zookeeper @@ -32,73 +30,43 @@ import java.io.IOException; class MaxTxId { static final Logger LOG = LoggerFactory.getLogger(MaxTxId.class); - private final ZooKeeperClient zkc; - private final String path; - private final boolean enabled; - + private Version version; private long currentMax; - MaxTxId(ZooKeeperClient zkc, String path, boolean enabled, - Versioned<byte[]> maxTxIdData) { - this.zkc = zkc; - this.path = path; - this.enabled = enabled && null != maxTxIdData && null != maxTxIdData.getVersion() - && null != maxTxIdData.getValue(); - if (this.enabled) { + MaxTxId(Versioned<byte[]> maxTxIdData) { + if (null != maxTxIdData + && null != maxTxIdData.getValue() + && null != maxTxIdData.getVersion()) { + this.version = maxTxIdData.getVersion(); try { this.currentMax = DLUtils.deserializeTransactionId(maxTxIdData.getValue()); } catch (NumberFormatException e) { - LOG.warn("Invalid txn id stored in {}", path, e); - this.currentMax = 0L; + LOG.warn("Invalid txn id stored in {}", e); + this.currentMax = DistributedLogConstants.INVALID_TXID; } } else { - this.currentMax = -1L; + this.currentMax = DistributedLogConstants.INVALID_TXID; + if (null != maxTxIdData && null != maxTxIdData.getVersion()) { + this.version = maxTxIdData.getVersion(); + } else { + throw new IllegalStateException("Invalid MaxTxId found - " + maxTxIdData); + } } } - String getZkPath() { - return path; - } - - synchronized void setMaxTxId(long txId) { - if (enabled && this.currentMax < txId) { + synchronized void update(Version version, long txId) { + if (version.compare(this.version) == Version.Occurred.AFTER) { + this.version = version; this.currentMax = txId; } } - synchronized byte[] couldStore(long maxTxId) { - if (enabled && currentMax < maxTxId) { - return DLUtils.serializeTransactionId(maxTxId); - } else { - return null; - } - } - - /** - * Store the highest TxID encountered so far so that we - * can enforce the monotonically non-decreasing property - * This is best effort as this enforcement is only done - * - * @param maxTxId - the maximum transaction id seen so far - * @throws IOException - */ - synchronized void store(long maxTxId) throws IOException { - if (enabled && currentMax < maxTxId) { - if (LOG.isTraceEnabled()) { - LOG.trace("Setting maxTxId to " + maxTxId); - } - String txidStr = Long.toString(maxTxId); - try { - zkc.get().setData(path, txidStr.getBytes("UTF-8"), -1); - currentMax = maxTxId; - } catch (Exception e) { - LOG.error("Error writing new MaxTxId value {}", maxTxId, e); - } - } - } - synchronized long get() { return currentMax; } + public Versioned<Long> getVersionedData(long txId) { + return new Versioned<Long>(Math.max(txId, currentMax), version); + } + } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java index f0d2797..1b831ea 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java @@ -23,12 +23,16 @@ import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.callback.LogSegmentNamesListener; import com.twitter.distributedlog.exceptions.LogNotFoundException; +import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.ZKException; +import com.twitter.distributedlog.impl.metadata.ZKLogMetadata; +import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.Transaction; +import com.twitter.distributedlog.util.Transaction.OpListener; import com.twitter.distributedlog.zk.DefaultZKOp; import com.twitter.distributedlog.zk.ZKOp; import com.twitter.distributedlog.zk.ZKTransaction; @@ -48,10 +52,8 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -220,30 +222,28 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch @Override public void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn, - String path, + ZKLogMetadata logMetadata, Versioned<Long> lssn, Transaction.OpListener<Version> listener) { Version version = lssn.getVersion(); assert(version instanceof ZkVersion); - ZkVersion zkVersion = (ZkVersion) version; byte[] data = DLUtils.serializeLogSegmentSequenceNumber(lssn.getValue()); - Op setDataOp = Op.setData(path, data, zkVersion.getZnodeVersion()); + Op setDataOp = Op.setData(logMetadata.getLogSegmentsPath(), data, zkVersion.getZnodeVersion()); ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener); txn.addOp(zkOp); } @Override public void storeMaxTxnId(Transaction<Object> txn, - String path, + ZKLogMetadataForWriter logMetadata, Versioned<Long> transactionId, Transaction.OpListener<Version> listener) { Version version = transactionId.getVersion(); assert(version instanceof ZkVersion); - ZkVersion zkVersion = (ZkVersion) version; byte[] data = DLUtils.serializeTransactionId(transactionId.getValue()); - Op setDataOp = Op.setData(path, data, zkVersion.getZnodeVersion()); + Op setDataOp = Op.setData(logMetadata.getMaxTxIdPath(), data, zkVersion.getZnodeVersion()); ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener); txn.addOp(zkOp); } @@ -256,29 +256,66 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch } @Override - public void createLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) { + public void createLogSegment(Transaction<Object> txn, + LogSegmentMetadata segment, + OpListener<Void> listener) { byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8); Op createOp = Op.create( segment.getZkPath(), finalisedData, zkc.getDefaultACL(), CreateMode.PERSISTENT); - txn.addOp(DefaultZKOp.of(createOp)); + txn.addOp(DefaultZKOp.of(createOp, listener)); } @Override - public void deleteLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) { + public void deleteLogSegment(Transaction<Object> txn, + final LogSegmentMetadata segment, + final OpListener<Void> listener) { Op deleteOp = Op.delete( segment.getZkPath(), -1); - txn.addOp(DefaultZKOp.of(deleteOp)); + logger.info("Delete segment : {}", segment); + txn.addOp(DefaultZKOp.of(deleteOp, new OpListener<Void>() { + @Override + public void onCommit(Void r) { + if (null != listener) { + listener.onCommit(r); + } + } + + @Override + public void onAbort(Throwable t) { + logger.info("Aborted transaction on deleting segment {}", segment); + KeeperException.Code kc; + if (t instanceof KeeperException) { + kc = ((KeeperException) t).code(); + } else if (t instanceof ZKException) { + kc = ((ZKException) t).getKeeperExceptionCode(); + } else { + abortListener(t); + return; + } + if (KeeperException.Code.NONODE == kc) { + abortListener(new LogSegmentNotFoundException(segment.getZkPath())); + return; + } + abortListener(t); + } + + private void abortListener(Throwable t) { + if (null != listener) { + listener.onAbort(t); + } + } + })); } @Override public void updateLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) { byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8); Op updateOp = Op.setData(segment.getZkPath(), finalisedData, -1); - txn.addOp(DefaultZKOp.of(updateOp)); + txn.addOp(DefaultZKOp.of(updateOp, null)); } // reads http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java index 078c040..37beb16 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java @@ -29,6 +29,17 @@ public class ZKLogMetadata { } /** + * Get the top stream path for a given log. + * + * @param uri namespace to store the log + * @param logName name of the log + * @return top stream path + */ + public static String getLogStreamPath(URI uri, String logName) { + return String.format("%s/%s", uri.getPath(), logName); + } + + /** * Get the log root path for a given log. * * @param uri @@ -59,14 +70,14 @@ public class ZKLogMetadata { } protected static final int LAYOUT_VERSION = -1; - protected final static String LOGSEGMENTS_PATH = "/ledgers"; - protected final static String VERSION_PATH = "/version"; + public final static String LOGSEGMENTS_PATH = "/ledgers"; + public final static String VERSION_PATH = "/version"; // writer znodes - protected final static String MAX_TXID_PATH = "/maxtxid"; - protected final static String LOCK_PATH = "/lock"; - protected final static String ALLOCATION_PATH = "/allocation"; + public final static String MAX_TXID_PATH = "/maxtxid"; + public final static String LOCK_PATH = "/lock"; + public final static String ALLOCATION_PATH = "/allocation"; // reader znodes - protected final static String READ_LOCK_PATH = "/readLock"; + public final static String READ_LOCK_PATH = "/readLock"; protected final URI uri; protected final String logName; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java index 1de712f..9a1548c 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java @@ -17,312 +17,15 @@ */ package com.twitter.distributedlog.impl.metadata; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.exceptions.DLException; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; -import com.twitter.distributedlog.exceptions.LogExistsException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.util.DLUtils; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.ExceptionalFunction; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.data.ACL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import java.io.File; import java.net.URI; -import java.util.List; /** * Log Metadata for writer */ public class ZKLogMetadataForWriter extends ZKLogMetadata { - static final Logger LOG = LoggerFactory.getLogger(ZKLogMetadataForWriter.class); - - static class MetadataIndex { - static final int LOG_ROOT_PARENT = 0; - static final int LOG_ROOT = 1; - static final int MAX_TXID = 2; - static final int VERSION = 3; - static final int LOCK = 4; - static final int READ_LOCK = 5; - static final int LOGSEGMENTS = 6; - static final int ALLOCATION = 7; - } - - static int bytesToInt(byte[] b) { - assert b.length >= 4; - return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3]; - } - - static byte[] intToBytes(int i) { - return new byte[]{ - (byte) (i >> 24), - (byte) (i >> 16), - (byte) (i >> 8), - (byte) (i)}; - } - - static boolean pathExists(Versioned<byte[]> metadata) { - return null != metadata.getValue() && null != metadata.getVersion(); - } - - static void ensureMetadataExist(Versioned<byte[]> metadata) { - Preconditions.checkNotNull(metadata.getValue()); - Preconditions.checkNotNull(metadata.getVersion()); - } - - public static Future<ZKLogMetadataForWriter> of( - final URI uri, - final String logName, - final String logIdentifier, - final ZooKeeper zk, - final List<ACL> acl, - final boolean ownAllocator, - final boolean createIfNotExists) { - final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, logIdentifier); - try { - PathUtils.validatePath(logRootPath); - } catch (IllegalArgumentException e) { - LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e}); - return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid")); - } - - return checkLogMetadataPaths(zk, logRootPath, ownAllocator) - .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() { - @Override - public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) { - Promise<List<Versioned<byte[]>>> promise = - new Promise<List<Versioned<byte[]>>>(); - createMissingMetadata(zk, logRootPath, metadatas, acl, - ownAllocator, createIfNotExists, promise); - return promise; - } - }).map(new ExceptionalFunction<List<Versioned<byte[]>>, ZKLogMetadataForWriter>() { - @Override - public ZKLogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException { - return processLogMetadatas(uri, logName, logIdentifier, metadatas, ownAllocator); - } - }); - } - - static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk, - String logRootPath, - boolean ownAllocator) { - // Note re. persistent lock state initialization: the read lock persistent state (path) is - // initialized here but only used in the read handler. The reason is its more convenient and - // less error prone to manage all stream structure in one place. - final String logRootParentPath = new File(logRootPath).getParent(); - final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH; - final String maxTxIdPath = logRootPath + MAX_TXID_PATH; - final String lockPath = logRootPath + LOCK_PATH; - final String readLockPath = logRootPath + READ_LOCK_PATH; - final String versionPath = logRootPath + VERSION_PATH; - final String allocationPath = logRootPath + ALLOCATION_PATH; - - int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1; - List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths); - checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false)); - checkFutures.add(Utils.zkGetData(zk, logRootPath, false)); - checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false)); - checkFutures.add(Utils.zkGetData(zk, versionPath, false)); - checkFutures.add(Utils.zkGetData(zk, lockPath, false)); - checkFutures.add(Utils.zkGetData(zk, readLockPath, false)); - checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false)); - if (ownAllocator) { - checkFutures.add(Utils.zkGetData(zk, allocationPath, false)); - } - - return Future.collect(checkFutures); - } - - static void createMissingMetadata(final ZooKeeper zk, - final String logRootPath, - final List<Versioned<byte[]>> metadatas, - final List<ACL> acl, - final boolean ownAllocator, - final boolean createIfNotExists, - final Promise<List<Versioned<byte[]>>> promise) { - final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size()); - final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size()); - CreateMode createMode = CreateMode.PERSISTENT; - - // log root parent path - if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) { - pathsToCreate.add(null); - } else { - String logRootParentPath = new File(logRootPath).getParent(); - pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); - zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); - } - - // log root path - if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) { - pathsToCreate.add(null); - } else { - pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); - zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); - } - - // max id - if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) { - pathsToCreate.add(null); - } else { - byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L); - pathsToCreate.add(zeroTxnIdData); - zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode)); - } - // version - if (pathExists(metadatas.get(MetadataIndex.VERSION))) { - pathsToCreate.add(null); - } else { - byte[] versionData = intToBytes(LAYOUT_VERSION); - pathsToCreate.add(versionData); - zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode)); - } - // lock path - if (pathExists(metadatas.get(MetadataIndex.LOCK))) { - pathsToCreate.add(null); - } else { - pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); - zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); - } - // read lock path - if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) { - pathsToCreate.add(null); - } else { - pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); - zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); - } - // log segments path - if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) { - pathsToCreate.add(null); - } else { - byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber( - DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO); - pathsToCreate.add(logSegmentsData); - zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode)); - } - // allocation path - if (ownAllocator) { - if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) { - pathsToCreate.add(null); - } else { - pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); - zkOps.add(Op.create(logRootPath + ALLOCATION_PATH, - DistributedLogConstants.EMPTY_BYTES, acl, createMode)); - } - } - if (zkOps.isEmpty()) { - // nothing missed - promise.setValue(metadatas); - return; - } - if (!createIfNotExists) { - promise.setException(new LogNotFoundException("Log " + logRootPath + " not found")); - return; - } - - zk.multi(zkOps, new AsyncCallback.MultiCallback() { - @Override - public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) { - if (KeeperException.Code.OK.intValue() == rc) { - List<Versioned<byte[]>> finalMetadatas = - Lists.newArrayListWithExpectedSize(metadatas.size()); - for (int i = 0; i < pathsToCreate.size(); i++) { - byte[] dataCreated = pathsToCreate.get(i); - if (null == dataCreated) { - finalMetadatas.add(metadatas.get(i)); - } else { - finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0))); - } - } - promise.setValue(finalMetadatas); - } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { - promise.setException(new LogExistsException("Someone just created log " - + logRootPath)); - } else { - if (LOG.isDebugEnabled()) { - StringBuilder builder = new StringBuilder(); - for (OpResult result : resultList) { - if (result instanceof OpResult.ErrorResult) { - OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result; - builder.append(errorResult.getErr()).append(","); - } else { - builder.append(0).append(","); - } - } - String resultCodeList = builder.substring(0, builder.length() - 1); - LOG.debug("Failed to create log, full rc list = {}", resultCodeList); - } - - promise.setException(new ZKException("Failed to create log " + logRootPath, - KeeperException.Code.get(rc))); - } - } - }, null); - } - - static ZKLogMetadataForWriter processLogMetadatas(URI uri, - String logName, - String logIdentifier, - List<Versioned<byte[]>> metadatas, - boolean ownAllocator) - throws UnexpectedException { - try { - // max id - Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID); - ensureMetadataExist(maxTxnIdData); - // version - Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION); - ensureMetadataExist(maxTxnIdData); - Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue())); - // lock path - ensureMetadataExist(metadatas.get(MetadataIndex.LOCK)); - // read lock path - ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK)); - // max lssn - Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS); - ensureMetadataExist(maxLSSNData); - try { - DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue()); - } catch (NumberFormatException nfe) { - throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe); - } - // allocation path - Versioned<byte[]> allocationData; - if (ownAllocator) { - allocationData = metadatas.get(MetadataIndex.ALLOCATION); - ensureMetadataExist(allocationData); - } else { - allocationData = new Versioned<byte[]>(null, null); - } - return new ZKLogMetadataForWriter(uri, logName, logIdentifier, - maxLSSNData, maxTxnIdData, allocationData); - } catch (IllegalArgumentException iae) { - throw new UnexpectedException("Invalid log " + logName, iae); - } catch (NullPointerException npe) { - throw new UnexpectedException("Invalid log " + logName, npe); - } - } - private final Versioned<byte[]> maxLSSNData; private final Versioned<byte[]> maxTxIdData; private final Versioned<byte[]> allocationData; @@ -334,12 +37,12 @@ public class ZKLogMetadataForWriter extends ZKLogMetadata { * @param logName name of the log * @param logIdentifier identifier of the log */ - private ZKLogMetadataForWriter(URI uri, - String logName, - String logIdentifier, - Versioned<byte[]> maxLSSNData, - Versioned<byte[]> maxTxIdData, - Versioned<byte[]> allocationData) { + public ZKLogMetadataForWriter(URI uri, + String logName, + String logIdentifier, + Versioned<byte[]> maxLSSNData, + Versioned<byte[]> maxTxIdData, + Versioned<byte[]> allocationData) { super(uri, logName, logIdentifier); this.maxLSSNData = maxLSSNData; this.maxTxIdData = maxTxIdData;