http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java deleted file mode 100644 index a8d9e6d..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java +++ /dev/null @@ -1,1125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import java.io.File; -import java.io.IOException; -import java.util.Comparator; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; -import com.twitter.distributedlog.exceptions.UnsupportedMetadataVersionException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * Utility class for storing the metadata associated - * with a single edit log segment, stored in a single ledger - */ -public class LogSegmentMetadata { - static final Logger LOG = LoggerFactory.getLogger(LogSegmentMetadata.class); - - public static enum LogSegmentMetadataVersion { - VERSION_INVALID(0), - VERSION_V1_ORIGINAL(1), - VERSION_V2_LEDGER_SEQNO(2), - VERSION_V3_MIN_ACTIVE_DLSN(3), - VERSION_V4_ENVELOPED_ENTRIES(4), - VERSION_V5_SEQUENCE_ID(5); - - public final int value; - - private LogSegmentMetadataVersion(int value) { - this.value = value; - } - - public static LogSegmentMetadataVersion of(int version) { - switch (version) { - case 5: - return VERSION_V5_SEQUENCE_ID; - case 4: - return VERSION_V4_ENVELOPED_ENTRIES; - case 3: - return VERSION_V3_MIN_ACTIVE_DLSN; - case 2: - return VERSION_V2_LEDGER_SEQNO; - case 1: - return VERSION_V1_ORIGINAL; - case 0: - return VERSION_INVALID; - default: - throw new IllegalArgumentException("unknown version " + version); - } - } - } - - public static enum TruncationStatus { - ACTIVE (0), PARTIALLY_TRUNCATED(1), TRUNCATED (2); - private final int value; - - private TruncationStatus(int value) { - this.value = value; - } - } - - public static class LogSegmentMetadataBuilder { - protected String zkPath; - protected long logSegmentId; - protected LogSegmentMetadataVersion version; - protected long firstTxId; - protected int regionId; - protected long status; - protected long lastTxId; - protected long completionTime; - protected int recordCount; - protected long logSegmentSequenceNo; - protected long lastEntryId; - protected long lastSlotId; - protected long minActiveEntryId; - protected long minActiveSlotId; - protected long startSequenceId; - protected boolean inprogress; - - // This is a derived attribute. - // Since we overwrite the original version with the target version, information that is - // derived from the original version (e.g. does it support enveloping of entries) - // is lost while parsing. - // NOTE: This value is not stored in the Metadata store. - protected boolean envelopeEntries = false; - - LogSegmentMetadataBuilder(String zkPath, - LogSegmentMetadataVersion version, - long logSegmentId, - long firstTxId) { - initialize(); - this.zkPath = zkPath; - this.version = version; - this.logSegmentId = logSegmentId; - this.firstTxId = firstTxId; - } - - LogSegmentMetadataBuilder(String zkPath, - int version, - long logSegmentId, - long firstTxId) { - this(zkPath, LogSegmentMetadataVersion.values()[version], logSegmentId, firstTxId); - } - - private void initialize() { - regionId = DistributedLogConstants.LOCAL_REGION_ID; - status = DistributedLogConstants.LOGSEGMENT_DEFAULT_STATUS; - lastTxId = DistributedLogConstants.INVALID_TXID; - completionTime = 0; - recordCount = 0; - lastEntryId = -1; - lastSlotId = -1; - minActiveEntryId = 0; - minActiveSlotId = 0; - startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID; - inprogress = true; - } - - LogSegmentMetadataBuilder setRegionId(int regionId) { - this.regionId = regionId; - return this; - } - - LogSegmentMetadataBuilder setStatus(long status) { - this.status = status; - return this; - } - - public LogSegmentMetadataBuilder setLastTxId(long lastTxId) { - this.lastTxId = lastTxId; - return this; - } - - public LogSegmentMetadataBuilder setCompletionTime(long completionTime) { - this.completionTime = completionTime; - return this; - } - - public LogSegmentMetadataBuilder setRecordCount(int recordCount) { - this.recordCount = recordCount; - return this; - } - - public LogSegmentMetadataBuilder setRecordCount(LogRecord record) { - this.recordCount = record.getLastPositionWithinLogSegment(); - return this; - } - - public LogSegmentMetadataBuilder setInprogress(boolean inprogress) { - this.inprogress = inprogress; - return this; - } - - LogSegmentMetadataBuilder setLogSegmentSequenceNo(long logSegmentSequenceNo) { - this.logSegmentSequenceNo = logSegmentSequenceNo; - return this; - } - - public LogSegmentMetadataBuilder setLastEntryId(long lastEntryId) { - this.lastEntryId = lastEntryId; - return this; - } - - LogSegmentMetadataBuilder setLastSlotId(long lastSlotId) { - this.lastSlotId = lastSlotId; - return this; - } - - LogSegmentMetadataBuilder setEnvelopeEntries(boolean envelopeEntries) { - this.envelopeEntries = envelopeEntries; - return this; - } - - LogSegmentMetadataBuilder setMinActiveEntryId(long minActiveEntryId) { - this.minActiveEntryId = minActiveEntryId; - return this; - } - - LogSegmentMetadataBuilder setMinActiveSlotId(long minActiveSlotId) { - this.minActiveSlotId = minActiveSlotId; - return this; - } - - LogSegmentMetadataBuilder setStartSequenceId(long startSequenceId) { - this.startSequenceId = startSequenceId; - return this; - } - - public LogSegmentMetadata build() { - return new LogSegmentMetadata( - zkPath, - version, - logSegmentId, - firstTxId, - lastTxId, - completionTime, - inprogress, - recordCount, - logSegmentSequenceNo, - lastEntryId, - lastSlotId, - regionId, - status, - minActiveEntryId, - minActiveSlotId, - startSequenceId, - envelopeEntries - ); - } - - } - - /** - * Mutator to mutate the metadata of a log segment. This mutator is going to create - * a new instance of the log segment metadata without changing the existing one. - */ - public static class Mutator extends LogSegmentMetadataBuilder { - - Mutator(LogSegmentMetadata original) { - super(original.getZkPath(), original.getVersion(), original.getLogSegmentId(), original.getFirstTxId()); - this.inprogress = original.isInProgress(); - this.logSegmentSequenceNo = original.getLogSegmentSequenceNumber(); - this.lastEntryId = original.getLastEntryId(); - this.lastSlotId = original.getLastSlotId(); - this.lastTxId = original.getLastTxId(); - this.completionTime = original.getCompletionTime(); - this.recordCount = original.getRecordCount(); - this.regionId = original.getRegionId(); - this.status = original.getStatus(); - this.minActiveEntryId = original.getMinActiveDLSN().getEntryId(); - this.minActiveSlotId = original.getMinActiveDLSN().getSlotId(); - this.startSequenceId = original.getStartSequenceId(); - this.envelopeEntries = original.getEnvelopeEntries(); - } - - @VisibleForTesting - public Mutator setVersion(LogSegmentMetadataVersion version) { - this.version = version; - return this; - } - - public Mutator setLogSegmentSequenceNumber(long seqNo) { - this.logSegmentSequenceNo = seqNo; - return this; - } - - public Mutator setZkPath(String zkPath) { - this.zkPath = zkPath; - return this; - } - - public Mutator setLastDLSN(DLSN dlsn) { - this.logSegmentSequenceNo = dlsn.getLogSegmentSequenceNo(); - this.lastEntryId = dlsn.getEntryId(); - this.lastSlotId = dlsn.getSlotId(); - return this; - } - - public Mutator setMinActiveDLSN(DLSN dlsn) { - if (this.logSegmentSequenceNo != dlsn.getLogSegmentSequenceNo()) { - throw new IllegalArgumentException("Updating minDLSN in an incorrect log segment"); - } - this.minActiveEntryId = dlsn.getEntryId(); - this.minActiveSlotId = dlsn.getSlotId(); - return this; - } - - public Mutator setTruncationStatus(TruncationStatus truncationStatus) { - status &= ~METADATA_TRUNCATION_STATUS_MASK; - status |= (truncationStatus.value & METADATA_TRUNCATION_STATUS_MASK); - return this; - } - - public Mutator setStartSequenceId(long startSequenceId) { - this.startSequenceId = startSequenceId; - return this; - } - } - - private final String zkPath; - private final long logSegmentId; - private final LogSegmentMetadataVersion version; - private final long firstTxId; - private final int regionId; - private final long status; - private final long lastTxId; - private final long completionTime; - private final int recordCount; - private final DLSN lastDLSN; - private final DLSN minActiveDLSN; - private final long startSequenceId; - private final boolean inprogress; - // This is a derived attribute. - // Since we overwrite the original version with the target version, information that is - // derived from the original version (e.g. does it support enveloping of entries) - // is lost while parsing. - // NOTE: This value is not stored in the Metadata store. - private final boolean envelopeEntries; - - public static final Comparator<LogSegmentMetadata> COMPARATOR - = new Comparator<LogSegmentMetadata>() { - - public int compare(LogSegmentMetadata o1, - LogSegmentMetadata o2) { - if ((o1.getLogSegmentSequenceNumber() == DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO) || - (o2.getLogSegmentSequenceNumber() == DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO)) { - if (o1.firstTxId < o2.firstTxId) { - return -1; - } else if (o1.firstTxId == o2.firstTxId) { - return 0; - } else { - return 1; - } - } else { - if (o1.getLogSegmentSequenceNumber() < o2.getLogSegmentSequenceNumber()) { - return -1; - } else if (o1.getLogSegmentSequenceNumber() == o2.getLogSegmentSequenceNumber()) { - // make sure we won't move over inprogress log segment if it still presents in the list - if (o1.isInProgress() && !o2.isInProgress()) { - return -1; - } else if (!o1.isInProgress() && o2.isInProgress()) { - return 1; - } else { - return 0; - } - } else { - return 1; - } - } - - - } - }; - - public static final Comparator<LogSegmentMetadata> DESC_COMPARATOR - = new Comparator<LogSegmentMetadata>() { - public int compare(LogSegmentMetadata o1, - LogSegmentMetadata o2) { - if ((o1.getLogSegmentSequenceNumber() == DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO) || - (o2.getLogSegmentSequenceNumber() == DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO)) { - if (o1.firstTxId > o2.firstTxId) { - return -1; - } else if (o1.firstTxId == o2.firstTxId) { - return 0; - } else { - return 1; - } - } else { - if (o1.getLogSegmentSequenceNumber() > o2.getLogSegmentSequenceNumber()) { - return -1; - } else if (o1.getLogSegmentSequenceNumber() == o2.getLogSegmentSequenceNumber()) { - // make sure we won't move over inprogress log segment if it still presents in the list - if (o1.isInProgress() && !o2.isInProgress()) { - return 1; - } else if (!o1.isInProgress() && o2.isInProgress()) { - return -1; - } else { - return 0; - } - } else { - return 1; - } - } - } - }; - - public static final int LEDGER_METADATA_CURRENT_LAYOUT_VERSION = - LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value; - - public static final int LEDGER_METADATA_OLDEST_SUPPORTED_VERSION = - LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value; - - static final int LOGRECORD_COUNT_SHIFT = 32; - static final long LOGRECORD_COUNT_MASK = 0xffffffff00000000L; - static final int REGION_SHIFT = 28; - static final long MAX_REGION_ID = 0xfL; - static final long REGION_MASK = 0x00000000f0000000L; - static final int STATUS_BITS_SHIFT = 8; - static final long STATUS_BITS_MASK = 0x000000000000ff00L; - static final long UNUSED_BITS_MASK = 0x000000000fff0000L; - static final long METADATA_VERSION_MASK = 0x00000000000000ffL; - - //Metadata status bits - static final long METADATA_TRUNCATION_STATUS_MASK = 0x3L; - static final long METADATA_STATUS_BIT_MAX = 0xffL; - - private LogSegmentMetadata(String zkPath, - LogSegmentMetadataVersion version, - long logSegmentId, - long firstTxId, - long lastTxId, - long completionTime, - boolean inprogress, - int recordCount, - long logSegmentSequenceNumber, - long lastEntryId, - long lastSlotId, - int regionId, - long status, - long minActiveEntryId, - long minActiveSlotId, - long startSequenceId, - boolean envelopeEntries) { - this.zkPath = zkPath; - this.logSegmentId = logSegmentId; - this.version = version; - this.firstTxId = firstTxId; - this.lastTxId = lastTxId; - this.inprogress = inprogress; - this.completionTime = completionTime; - this.recordCount = recordCount; - this.lastDLSN = new DLSN(logSegmentSequenceNumber, lastEntryId, lastSlotId); - this.minActiveDLSN = new DLSN(logSegmentSequenceNumber, minActiveEntryId, minActiveSlotId); - this.startSequenceId = startSequenceId; - this.regionId = regionId; - this.status = status; - this.envelopeEntries = envelopeEntries; - } - - public String getZkPath() { - return zkPath; - } - - public String getZNodeName() { - return new File(zkPath).getName(); - } - - public long getFirstTxId() { - return firstTxId; - } - - public long getLastTxId() { - return lastTxId; - } - - public long getCompletionTime() { - return completionTime; - } - - public long getLogSegmentId() { - return logSegmentId; - } - - public long getLogSegmentSequenceNumber() { - return lastDLSN.getLogSegmentSequenceNo(); - } - - public int getVersion() { - return version.value; - } - - public boolean getEnvelopeEntries() { - return envelopeEntries; - } - - public long getLastEntryId() { - return lastDLSN.getEntryId(); - } - - long getStatus() { - return status; - } - - public long getStartSequenceId() { - // generate negative sequence id for log segments that created <= v4 - return supportsSequenceId() && startSequenceId != DistributedLogConstants.UNASSIGNED_SEQUENCE_ID ? - startSequenceId : Long.MIN_VALUE + (getLogSegmentSequenceNumber() << 32L); - } - - public boolean isTruncated() { - return ((status & METADATA_TRUNCATION_STATUS_MASK) - == TruncationStatus.TRUNCATED.value); - } - - public boolean isPartiallyTruncated() { - return ((status & METADATA_TRUNCATION_STATUS_MASK) - == TruncationStatus.PARTIALLY_TRUNCATED.value); - } - - public boolean isNonTruncated() { - return ((status & METADATA_TRUNCATION_STATUS_MASK) - == TruncationStatus.ACTIVE.value); - } - - public long getLastSlotId() { - return lastDLSN.getSlotId(); - } - - public DLSN getLastDLSN() { - return lastDLSN; - } - - public DLSN getMinActiveDLSN() { - return minActiveDLSN; - } - - public DLSN getFirstDLSN() { - return new DLSN(getLogSegmentSequenceNumber(), 0, 0); - } - - public int getRecordCount() { - return recordCount; - } - - public int getRegionId() { - return regionId; - } - - public boolean isInProgress() { - return this.inprogress; - } - - @VisibleForTesting - public boolean isDLSNinThisSegment(DLSN dlsn) { - return dlsn.getLogSegmentSequenceNo() == getLogSegmentSequenceNumber(); - } - - @VisibleForTesting - public boolean isRecordPositionWithinSegmentScope(LogRecord record) { - return record.getLastPositionWithinLogSegment() <= getRecordCount(); - } - - @VisibleForTesting - public boolean isRecordLastPositioninThisSegment(LogRecord record) { - return record.getLastPositionWithinLogSegment() == getRecordCount(); - } - - /** - * complete current log segment. A new log segment metadata instance will be returned. - * - * @param zkPath - * zk path for the completed log segment. - * @param newLastTxId - * last tx id - * @param recordCount - * record count - * @param lastEntryId - * last entry id - * @param lastSlotId - * last slot id - * @return completed log segment. - */ - LogSegmentMetadata completeLogSegment(String zkPath, - long newLastTxId, - int recordCount, - long lastEntryId, - long lastSlotId, - long startSequenceId) { - assert this.lastTxId == DistributedLogConstants.INVALID_TXID; - - return new Mutator(this) - .setZkPath(zkPath) - .setLastDLSN(new DLSN(this.lastDLSN.getLogSegmentSequenceNo(), lastEntryId, lastSlotId)) - .setLastTxId(newLastTxId) - .setInprogress(false) - .setCompletionTime(Utils.nowInMillis()) - .setRecordCount(recordCount) - .setStartSequenceId(startSequenceId) - .build(); - } - - public static Future<LogSegmentMetadata> read(ZooKeeperClient zkc, String path) { - return read(zkc, path, false); - } - - public static Future<LogSegmentMetadata> read(ZooKeeperClient zkc, String path, final boolean skipMinVersionCheck) { - final Promise<LogSegmentMetadata> result = new Promise<LogSegmentMetadata>(); - try { - zkc.get().getData(path, false, new AsyncCallback.DataCallback() { - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (KeeperException.Code.OK.intValue() != rc) { - if (KeeperException.Code.NONODE.intValue() == rc) { - FutureUtils.setException(result, new LogSegmentNotFoundException(path)); - } else { - FutureUtils.setException(result, - new ZKException("Failed to read log segment metadata from " + path, - KeeperException.Code.get(rc))); - } - return; - } - try { - LogSegmentMetadata metadata = parseData(path, data, skipMinVersionCheck); - FutureUtils.setValue(result, metadata); - } catch (IOException ie) { - LOG.error("Error on parsing log segment metadata from {} : ", path, ie); - result.setException(ie); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - result.setException(FutureUtils.zkException(e, path)); - } catch (InterruptedException e) { - result.setException(FutureUtils.zkException(e, path)); - } - return result; - } - - static LogSegmentMetadata parseDataV1(String path, byte[] data, String[] parts) - throws IOException { - long versionStatusCount = Long.parseLong(parts[0]); - - long version = versionStatusCount & METADATA_VERSION_MASK; - assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); - assert (1 == version); - - LogSegmentMetadataVersion llmv = LogSegmentMetadataVersion.VERSION_V1_ORIGINAL; - - int regionId = (int)(versionStatusCount & REGION_MASK) >> REGION_SHIFT; - assert (regionId >= 0 && regionId <= 0xf); - - long status = (versionStatusCount & STATUS_BITS_MASK) >> STATUS_BITS_SHIFT; - assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); - - if (parts.length == 3) { - long logSegmentId = Long.parseLong(parts[1]); - long txId = Long.parseLong(parts[2]); - return new LogSegmentMetadataBuilder(path, llmv, logSegmentId, txId) - .setRegionId(regionId) - .setStatus(status) - .build(); - } else if (parts.length == 5) { - long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; - assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); - - long logSegmentId = Long.parseLong(parts[1]); - long firstTxId = Long.parseLong(parts[2]); - long lastTxId = Long.parseLong(parts[3]); - long completionTime = Long.parseLong(parts[4]); - return new LogSegmentMetadataBuilder(path, llmv, logSegmentId, firstTxId) - .setInprogress(false) - .setLastTxId(lastTxId) - .setCompletionTime(completionTime) - .setRecordCount((int) recordCount) - .setRegionId(regionId) - .setStatus(status) - .build(); - } else { - throw new IOException("Invalid log segment metadata : " - + new String(data, UTF_8)); - } - } - - static LogSegmentMetadata parseDataV2(String path, byte[] data, String[] parts) - throws IOException { - long versionStatusCount = Long.parseLong(parts[0]); - - long version = versionStatusCount & METADATA_VERSION_MASK; - assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); - assert (2 == version); - - LogSegmentMetadataVersion llmv = LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO; - - int regionId = (int)((versionStatusCount & REGION_MASK) >> REGION_SHIFT); - assert (regionId >= 0 && regionId <= 0xf); - - long status = (versionStatusCount & STATUS_BITS_MASK) >> STATUS_BITS_SHIFT; - assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); - - if (parts.length == 4) { - long logSegmentId = Long.parseLong(parts[1]); - long txId = Long.parseLong(parts[2]); - long logSegmentSequenceNumber = Long.parseLong(parts[3]); - return new LogSegmentMetadataBuilder(path, llmv, logSegmentId, txId) - .setLogSegmentSequenceNo(logSegmentSequenceNumber) - .setRegionId(regionId) - .setStatus(status) - .build(); - } else if (parts.length == 8) { - long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; - assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); - - long logSegmentId = Long.parseLong(parts[1]); - long firstTxId = Long.parseLong(parts[2]); - long lastTxId = Long.parseLong(parts[3]); - long completionTime = Long.parseLong(parts[4]); - long logSegmentSequenceNumber = Long.parseLong(parts[5]); - long lastEntryId = Long.parseLong(parts[6]); - long lastSlotId = Long.parseLong(parts[7]); - return new LogSegmentMetadataBuilder(path, llmv, logSegmentId, firstTxId) - .setInprogress(false) - .setLastTxId(lastTxId) - .setCompletionTime(completionTime) - .setRecordCount((int) recordCount) - .setLogSegmentSequenceNo(logSegmentSequenceNumber) - .setLastEntryId(lastEntryId) - .setLastSlotId(lastSlotId) - .setRegionId(regionId) - .setStatus(status) - .build(); - } else { - throw new IOException("Invalid logsegment metadata : " - + new String(data, UTF_8)); - } - - } - - static LogSegmentMetadata parseDataVersionsWithMinActiveDLSN(String path, byte[] data, String[] parts) - throws IOException { - long versionStatusCount = Long.parseLong(parts[0]); - - long version = versionStatusCount & METADATA_VERSION_MASK; - assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); - assert (LogSegmentMetadataVersion.VERSION_V3_MIN_ACTIVE_DLSN.value <= version && - LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value >= version); - - LogSegmentMetadataVersion llmv = LogSegmentMetadataVersion.of((int) version); - - int regionId = (int)((versionStatusCount & REGION_MASK) >> REGION_SHIFT); - assert (regionId >= 0 && regionId <= 0xf); - - long status = (versionStatusCount & STATUS_BITS_MASK) >> STATUS_BITS_SHIFT; - assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); - - if (parts.length == 6) { - long logSegmentId = Long.parseLong(parts[1]); - long txId = Long.parseLong(parts[2]); - long logSegmentSequenceNumber = Long.parseLong(parts[3]); - long minActiveEntryId = Long.parseLong(parts[4]); - long minActiveSlotId = Long.parseLong(parts[5]); - - LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, logSegmentId, txId) - .setLogSegmentSequenceNo(logSegmentSequenceNumber) - .setMinActiveEntryId(minActiveEntryId) - .setMinActiveSlotId(minActiveSlotId) - .setRegionId(regionId) - .setStatus(status); - if (supportsEnvelopedEntries((int) version)) { - builder = builder.setEnvelopeEntries(true); - } - return builder.build(); - } else if (parts.length == 10) { - long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; - assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); - - long logSegmentId = Long.parseLong(parts[1]); - long firstTxId = Long.parseLong(parts[2]); - long lastTxId = Long.parseLong(parts[3]); - long completionTime = Long.parseLong(parts[4]); - long logSegmentSequenceNumber = Long.parseLong(parts[5]); - long lastEntryId = Long.parseLong(parts[6]); - long lastSlotId = Long.parseLong(parts[7]); - long minActiveEntryId = Long.parseLong(parts[8]); - long minActiveSlotId = Long.parseLong(parts[9]); - LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, logSegmentId, firstTxId) - .setInprogress(false) - .setLastTxId(lastTxId) - .setCompletionTime(completionTime) - .setRecordCount((int) recordCount) - .setLogSegmentSequenceNo(logSegmentSequenceNumber) - .setLastEntryId(lastEntryId) - .setLastSlotId(lastSlotId) - .setMinActiveEntryId(minActiveEntryId) - .setMinActiveSlotId(minActiveSlotId) - .setRegionId(regionId) - .setStatus(status); - if (supportsEnvelopedEntries((int) version)) { - builder = builder.setEnvelopeEntries(true); - } - return builder.build(); - } else { - throw new IOException("Invalid logsegment metadata : " - + new String(data, UTF_8)); - } - - } - - static LogSegmentMetadata parseDataVersionsWithSequenceId(String path, byte[] data, String[] parts) - throws IOException { - long versionStatusCount = Long.parseLong(parts[0]); - - long version = versionStatusCount & METADATA_VERSION_MASK; - assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); - assert (LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value <= version && - LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION >= version); - - LogSegmentMetadataVersion llmv = LogSegmentMetadataVersion.of((int) version); - - int regionId = (int)((versionStatusCount & REGION_MASK) >> REGION_SHIFT); - assert (regionId >= 0 && regionId <= 0xf); - - long status = (versionStatusCount & STATUS_BITS_MASK) >> STATUS_BITS_SHIFT; - assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); - - if (parts.length == 7) { - long logSegmentId = Long.parseLong(parts[1]); - long txId = Long.parseLong(parts[2]); - long logSegmentSequenceNumber = Long.parseLong(parts[3]); - long minActiveEntryId = Long.parseLong(parts[4]); - long minActiveSlotId = Long.parseLong(parts[5]); - long startSequenceId = Long.parseLong(parts[6]); - - LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, logSegmentId, txId) - .setLogSegmentSequenceNo(logSegmentSequenceNumber) - .setMinActiveEntryId(minActiveEntryId) - .setMinActiveSlotId(minActiveSlotId) - .setRegionId(regionId) - .setStatus(status) - .setStartSequenceId(startSequenceId) - .setEnvelopeEntries(true); - return builder.build(); - } else if (parts.length == 11) { - long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; - assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); - - long logSegmentId = Long.parseLong(parts[1]); - long firstTxId = Long.parseLong(parts[2]); - long lastTxId = Long.parseLong(parts[3]); - long completionTime = Long.parseLong(parts[4]); - long logSegmentSequenceNumber = Long.parseLong(parts[5]); - long lastEntryId = Long.parseLong(parts[6]); - long lastSlotId = Long.parseLong(parts[7]); - long minActiveEntryId = Long.parseLong(parts[8]); - long minActiveSlotId = Long.parseLong(parts[9]); - long startSequenceId = Long.parseLong(parts[10]); - LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, logSegmentId, firstTxId) - .setInprogress(false) - .setLastTxId(lastTxId) - .setCompletionTime(completionTime) - .setRecordCount((int) recordCount) - .setLogSegmentSequenceNo(logSegmentSequenceNumber) - .setLastEntryId(lastEntryId) - .setLastSlotId(lastSlotId) - .setMinActiveEntryId(minActiveEntryId) - .setMinActiveSlotId(minActiveSlotId) - .setRegionId(regionId) - .setStatus(status) - .setStartSequenceId(startSequenceId) - .setEnvelopeEntries(true); - return builder.build(); - } else { - throw new IOException("Invalid log segment metadata : " - + new String(data, UTF_8)); - } - } - - public static LogSegmentMetadata parseData(String path, byte[] data) - throws IOException { - return parseData(path, data, false); - } - - static LogSegmentMetadata parseData(String path, byte[] data, boolean skipMinVersionCheck) throws IOException { - String[] parts = new String(data, UTF_8).split(";"); - long version; - try { - version = Long.parseLong(parts[0]) & METADATA_VERSION_MASK; - } catch (Exception exc) { - throw new IOException("Invalid ledger entry, " - + new String(data, UTF_8)); - } - - if (!skipMinVersionCheck && version < LogSegmentMetadata.LEDGER_METADATA_OLDEST_SUPPORTED_VERSION) { - throw new UnsupportedMetadataVersionException("Ledger metadata version '" + version + "' is no longer supported: " - + new String(data, UTF_8)); - } - - if (version > LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION) { - throw new UnsupportedMetadataVersionException("Metadata version '" + version + "' is higher than the highest supported version : " - + new String(data, UTF_8)); - } - - if (LogSegmentMetadataVersion.VERSION_V1_ORIGINAL.value == version) { - return parseDataV1(path, data, parts); - } else if (LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value == version) { - return parseDataV2(path, data, parts); - } else if (LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value >= version && - LogSegmentMetadataVersion.VERSION_V3_MIN_ACTIVE_DLSN.value <= version) { - return parseDataVersionsWithMinActiveDLSN(path, data, parts); - } else { - assert(version >= LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value); - return parseDataVersionsWithSequenceId(path, data, parts); - } - } - - public String getFinalisedData() { - return getFinalisedData(this.version); - } - - public String getFinalisedData(LogSegmentMetadataVersion version) { - String finalisedData; - final long logSegmentSeqNo = getLogSegmentSequenceNumber(); - final long lastEntryId = getLastEntryId(); - final long lastSlotId = getLastSlotId(); - final long minActiveEntryId = minActiveDLSN.getEntryId(); - final long minActiveSlotId = minActiveDLSN.getSlotId(); - - if (LogSegmentMetadataVersion.VERSION_V1_ORIGINAL == version) { - if (inprogress) { - finalisedData = String.format("%d;%d;%d", - version.value, logSegmentId, firstTxId); - } else { - long versionAndCount = ((long) version.value) | ((long)recordCount << LOGRECORD_COUNT_SHIFT); - finalisedData = String.format("%d;%d;%d;%d;%d", - versionAndCount, logSegmentId, firstTxId, lastTxId, completionTime); - } - } else { - long versionStatusCount = ((long) version.value); - versionStatusCount |= ((status & METADATA_STATUS_BIT_MAX) << STATUS_BITS_SHIFT); - versionStatusCount |= (((long) regionId & MAX_REGION_ID) << REGION_SHIFT); - if (!inprogress) { - versionStatusCount |= ((long)recordCount << LOGRECORD_COUNT_SHIFT); - } - if (LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO == version) { - if (inprogress) { - finalisedData = String.format("%d;%d;%d;%d", - versionStatusCount, logSegmentId, firstTxId, logSegmentSeqNo); - } else { - finalisedData = String.format("%d;%d;%d;%d;%d;%d;%d;%d", - versionStatusCount, logSegmentId, firstTxId, lastTxId, completionTime, - logSegmentSeqNo, lastEntryId, lastSlotId); - } - } else if (LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value >= version.value && - LogSegmentMetadataVersion.VERSION_V3_MIN_ACTIVE_DLSN.value <= version.value) { - if (inprogress) { - finalisedData = String.format("%d;%d;%d;%d;%d;%d", - versionStatusCount, logSegmentId, firstTxId, logSegmentSeqNo, minActiveEntryId, minActiveSlotId); - } else { - finalisedData = String.format("%d;%d;%d;%d;%d;%d;%d;%d;%d;%d", - versionStatusCount, logSegmentId, firstTxId, lastTxId, completionTime, - logSegmentSeqNo, lastEntryId, lastSlotId, minActiveEntryId, minActiveSlotId); - } - } else if (LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value <= version.value && - LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION >= version.value) { - if (inprogress) { - finalisedData = String.format("%d;%d;%d;%d;%d;%d;%d", - versionStatusCount, logSegmentId, firstTxId, logSegmentSeqNo, minActiveEntryId, minActiveSlotId, startSequenceId); - } else { - finalisedData = String.format("%d;%d;%d;%d;%d;%d;%d;%d;%d;%d;%d", - versionStatusCount, logSegmentId, firstTxId, lastTxId, completionTime, - logSegmentSeqNo, lastEntryId, lastSlotId, minActiveEntryId, minActiveSlotId, startSequenceId); - } - } else { - throw new IllegalStateException("Unsupported log segment ledger metadata version '" + version + "'"); - } - } - return finalisedData; - } - - String getSegmentName() { - String[] parts = this.zkPath.split("/"); - if (parts.length <= 0) { - throw new IllegalStateException("ZK Path is not valid"); - } - return parts[parts.length - 1]; - } - - public void write(ZooKeeperClient zkc) - throws IOException, KeeperException.NodeExistsException { - String finalisedData = getFinalisedData(version); - try { - zkc.get().create(zkPath, finalisedData.getBytes(UTF_8), - zkc.getDefaultACL(), CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException nee) { - throw nee; - } catch (InterruptedException ie) { - throw new DLInterruptedException("Interrupted on creating ledger znode " + zkPath, ie); - } catch (Exception e) { - LOG.error("Error creating ledger znode {}", zkPath, e); - throw new IOException("Error creating ledger znode " + zkPath); - } - } - - boolean checkEquivalence(ZooKeeperClient zkc, String path) { - try { - LogSegmentMetadata other = FutureUtils.result(read(zkc, path)); - if (LOG.isTraceEnabled()) { - LOG.trace("Verifying {} against {}", this, other); - } - - boolean retVal; - - // All fields may not be comparable so only compare the ones - // that can be compared - // completionTime is set when a node is finalized, so that - // cannot be compared - // if the node is inprogress, don't compare the lastTxId either - if (this.getLogSegmentSequenceNumber() != other.getLogSegmentSequenceNumber() || - this.logSegmentId != other.logSegmentId || - this.firstTxId != other.firstTxId) { - retVal = false; - } else if (this.inprogress) { - retVal = other.inprogress; - } else { - retVal = (!other.inprogress && (this.lastTxId == other.lastTxId)); - } - - if (!retVal) { - LOG.warn("Equivalence check failed between {} and {}", this, other); - } - - return retVal; - } catch (Exception e) { - LOG.error("Could not check equivalence between:" + this + " and data in " + path, e); - return false; - } - } - - public boolean equals(Object o) { - if (!(o instanceof LogSegmentMetadata)) { - return false; - } - LogSegmentMetadata ol = (LogSegmentMetadata) o; - return getLogSegmentSequenceNumber() == ol.getLogSegmentSequenceNumber() - && logSegmentId == ol.logSegmentId - && firstTxId == ol.firstTxId - && lastTxId == ol.lastTxId - && version == ol.version - && completionTime == ol.completionTime - && Objects.equal(lastDLSN, ol.lastDLSN) - && Objects.equal(minActiveDLSN, ol.minActiveDLSN) - && startSequenceId == ol.startSequenceId - && status == ol.status; - } - - public int hashCode() { - int hash = 1; - hash = hash * 31 + (int) logSegmentId; - hash = hash * 31 + (int) firstTxId; - hash = hash * 31 + (int) lastTxId; - hash = hash * 31 + version.value; - hash = hash * 31 + (int) completionTime; - hash = hash * 31 + (int) getLogSegmentSequenceNumber(); - return hash; - } - - public String toString() { - return "[LogSegmentId:" + logSegmentId + - ", firstTxId:" + firstTxId + - ", lastTxId:" + lastTxId + - ", version:" + version + - ", completionTime:" + completionTime + - ", recordCount:" + recordCount + - ", regionId:" + regionId + - ", status:" + status + - ", logSegmentSequenceNumber:" + getLogSegmentSequenceNumber() + - ", lastEntryId:" + getLastEntryId() + - ", lastSlotId:" + getLastSlotId() + - ", inprogress:" + inprogress + - ", minActiveDLSN:" + minActiveDLSN + - ", startSequenceId:" + startSequenceId + - "]"; - } - - public Mutator mutator() { - return new Mutator(this); - } - - - // - // Version Checking Utilities - // - - public boolean supportsLogSegmentSequenceNo() { - return supportsLogSegmentSequenceNo(version.value); - } - - /** - * Whether the provided version supports log segment sequence number. - * - * @param version - * log segment metadata version - * @return true if this log segment supports log segment sequence number. - */ - public static boolean supportsLogSegmentSequenceNo(int version) { - return version >= LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value; - } - - /** - * Whether the provided version supports enveloping entries before writing to bookkeeper. - * - * @param version - * log segment metadata version - * @return true if this log segment supports enveloping entries - */ - public static boolean supportsEnvelopedEntries(int version) { - return version >= LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value; - } - - public boolean supportsSequenceId() { - return supportsSequenceId(version.value); - } - - /** - * Whether the provided version supports sequence id. - * - * @param version - * log segment metadata version - * @return true if the log segment support sequence id. - */ - public static boolean supportsSequenceId(int version) { - return version >= LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogWriter.java deleted file mode 100644 index d7de586..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogWriter.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.io.Abortable; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/* -* A generic interface class to support writing log records into -* a persistent distributed log. -*/ -public interface LogWriter extends Closeable, Abortable { - /** - * Write a log record to the stream. - * - * @param record single log record - * @throws IOException - */ - public void write(LogRecord record) throws IOException; - - - /** - * Write a list of log records to the stream. - * - * @param records list of log records - * @throws IOException - */ - @Deprecated - public int writeBulk(List<LogRecord> records) throws IOException; - - /** - * All data that has been written to the stream so far will be sent to - * persistent storage. - * The transmission is asynchronous and new data can be still written to the - * stream while flushing is performed. - * - * TODO: rename this to flush() - */ - public long setReadyToFlush() throws IOException; - - /** - * Flush and sync all data that is ready to be flush - * {@link #setReadyToFlush()} into underlying persistent store. - * @throws IOException - * - * TODO: rename this to commit() - */ - public long flushAndSync() throws IOException; - - /** - * Flushes all the data up to this point, - * adds the end of stream marker and marks the stream - * as read-only in the metadata. No appends to the - * stream will be allowed after this point - * - * @throws IOException - */ - public void markEndOfStream() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java deleted file mode 100644 index 9bfaaba..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.util.DLUtils; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; - -/** - * Utility class for storing and reading max ledger sequence number - */ -class MaxLogSegmentSequenceNo { - - Version version; - long maxSeqNo; - - MaxLogSegmentSequenceNo(Versioned<byte[]> logSegmentsData) { - if (null != logSegmentsData - && null != logSegmentsData.getValue() - && null != logSegmentsData.getVersion()) { - version = logSegmentsData.getVersion(); - try { - maxSeqNo = DLUtils.deserializeLogSegmentSequenceNumber(logSegmentsData.getValue()); - } catch (NumberFormatException nfe) { - maxSeqNo = DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO; - } - } else { - maxSeqNo = DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO; - if (null != logSegmentsData && null != logSegmentsData.getVersion()) { - version = logSegmentsData.getVersion(); - } else { - throw new IllegalStateException("Invalid MaxLogSegmentSequenceNo found - " + logSegmentsData); - } - } - } - - synchronized Version getVersion() { - return version; - } - - synchronized long getSequenceNumber() { - return maxSeqNo; - } - - synchronized MaxLogSegmentSequenceNo update(Version version, long logSegmentSeqNo) { - if (version.compare(this.version) == Version.Occurred.AFTER) { - this.version = version; - this.maxSeqNo = logSegmentSeqNo; - } - return this; - } - - public synchronized Versioned<Long> getVersionedData(long seqNo) { - return new Versioned<Long>(seqNo, version); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java deleted file mode 100644 index 8eabf88..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.util.DLUtils; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utility class for storing and reading - * the max seen txid in zookeeper - */ -class MaxTxId { - static final Logger LOG = LoggerFactory.getLogger(MaxTxId.class); - - private Version version; - private long currentMax; - - MaxTxId(Versioned<byte[]> maxTxIdData) { - if (null != maxTxIdData - && null != maxTxIdData.getValue() - && null != maxTxIdData.getVersion()) { - this.version = maxTxIdData.getVersion(); - try { - this.currentMax = DLUtils.deserializeTransactionId(maxTxIdData.getValue()); - } catch (NumberFormatException e) { - LOG.warn("Invalid txn id stored in {}", e); - this.currentMax = DistributedLogConstants.INVALID_TXID; - } - } else { - this.currentMax = DistributedLogConstants.INVALID_TXID; - if (null != maxTxIdData && null != maxTxIdData.getVersion()) { - this.version = maxTxIdData.getVersion(); - } else { - throw new IllegalStateException("Invalid MaxTxId found - " + maxTxIdData); - } - } - } - - synchronized void update(Version version, long txId) { - if (version.compare(this.version) == Version.Occurred.AFTER) { - this.version = version; - this.currentMax = txId; - } - } - - synchronized long get() { - return currentMax; - } - - public synchronized Versioned<Long> getVersionedData(long txId) { - return new Versioned<Long>(Math.max(txId, get()), version); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/MetadataAccessor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MetadataAccessor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MetadataAccessor.java deleted file mode 100644 index f6ff587..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MetadataAccessor.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.io.AsyncCloseable; - -import java.io.Closeable; -import java.io.IOException; - -public interface MetadataAccessor extends Closeable, AsyncCloseable { - /** - * Get the name of the stream managed by this log manager - * @return streamName - */ - public String getStreamName(); - - public void createOrUpdateMetadata(byte[] metadata) throws IOException; - - public void deleteMetadata() throws IOException; - - public byte[] getMetadata() throws IOException; - - /** - * Close the distributed log metadata, freeing any resources it may hold. - */ - public void close() throws IOException; - -}