http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java new file mode 100644 index 0000000..c5050ec --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java @@ -0,0 +1,1125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import java.io.File; +import java.io.IOException; +import java.util.Comparator; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.exceptions.LogSegmentNotFoundException; +import org.apache.distributedlog.exceptions.UnsupportedMetadataVersionException; +import org.apache.distributedlog.exceptions.ZKException; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.Utils; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Charsets.UTF_8; + +/** + * Utility class for storing the metadata associated + * with a single edit log segment, stored in a single ledger + */ +public class LogSegmentMetadata { + static final Logger LOG = LoggerFactory.getLogger(LogSegmentMetadata.class); + + public static enum LogSegmentMetadataVersion { + VERSION_INVALID(0), + VERSION_V1_ORIGINAL(1), + VERSION_V2_LEDGER_SEQNO(2), + VERSION_V3_MIN_ACTIVE_DLSN(3), + VERSION_V4_ENVELOPED_ENTRIES(4), + VERSION_V5_SEQUENCE_ID(5); + + public final int value; + + private LogSegmentMetadataVersion(int value) { + this.value = value; + } + + public static LogSegmentMetadataVersion of(int version) { + switch (version) { + case 5: + return VERSION_V5_SEQUENCE_ID; + case 4: + return VERSION_V4_ENVELOPED_ENTRIES; + case 3: + return VERSION_V3_MIN_ACTIVE_DLSN; + case 2: + return VERSION_V2_LEDGER_SEQNO; + case 1: + return VERSION_V1_ORIGINAL; + case 0: + return VERSION_INVALID; + default: + throw new IllegalArgumentException("unknown version " + version); + } + } + } + + public static enum TruncationStatus { + ACTIVE (0), PARTIALLY_TRUNCATED(1), TRUNCATED (2); + private final int value; + + private TruncationStatus(int value) { + this.value = value; + } + } + + public static class LogSegmentMetadataBuilder { + protected String zkPath; + protected long logSegmentId; + protected LogSegmentMetadataVersion version; + protected long firstTxId; + protected int regionId; + protected long status; + protected long lastTxId; + protected long completionTime; + protected int recordCount; + protected long logSegmentSequenceNo; + protected long lastEntryId; + protected long lastSlotId; + protected long minActiveEntryId; + protected long minActiveSlotId; + protected long startSequenceId; + protected boolean inprogress; + + // This is a derived attribute. + // Since we overwrite the original version with the target version, information that is + // derived from the original version (e.g. does it support enveloping of entries) + // is lost while parsing. + // NOTE: This value is not stored in the Metadata store. + protected boolean envelopeEntries = false; + + LogSegmentMetadataBuilder(String zkPath, + LogSegmentMetadataVersion version, + long logSegmentId, + long firstTxId) { + initialize(); + this.zkPath = zkPath; + this.version = version; + this.logSegmentId = logSegmentId; + this.firstTxId = firstTxId; + } + + LogSegmentMetadataBuilder(String zkPath, + int version, + long logSegmentId, + long firstTxId) { + this(zkPath, LogSegmentMetadataVersion.values()[version], logSegmentId, firstTxId); + } + + private void initialize() { + regionId = DistributedLogConstants.LOCAL_REGION_ID; + status = DistributedLogConstants.LOGSEGMENT_DEFAULT_STATUS; + lastTxId = DistributedLogConstants.INVALID_TXID; + completionTime = 0; + recordCount = 0; + lastEntryId = -1; + lastSlotId = -1; + minActiveEntryId = 0; + minActiveSlotId = 0; + startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID; + inprogress = true; + } + + LogSegmentMetadataBuilder setRegionId(int regionId) { + this.regionId = regionId; + return this; + } + + LogSegmentMetadataBuilder setStatus(long status) { + this.status = status; + return this; + } + + public LogSegmentMetadataBuilder setLastTxId(long lastTxId) { + this.lastTxId = lastTxId; + return this; + } + + public LogSegmentMetadataBuilder setCompletionTime(long completionTime) { + this.completionTime = completionTime; + return this; + } + + public LogSegmentMetadataBuilder setRecordCount(int recordCount) { + this.recordCount = recordCount; + return this; + } + + public LogSegmentMetadataBuilder setRecordCount(LogRecord record) { + this.recordCount = record.getLastPositionWithinLogSegment(); + return this; + } + + public LogSegmentMetadataBuilder setInprogress(boolean inprogress) { + this.inprogress = inprogress; + return this; + } + + LogSegmentMetadataBuilder setLogSegmentSequenceNo(long logSegmentSequenceNo) { + this.logSegmentSequenceNo = logSegmentSequenceNo; + return this; + } + + public LogSegmentMetadataBuilder setLastEntryId(long lastEntryId) { + this.lastEntryId = lastEntryId; + return this; + } + + LogSegmentMetadataBuilder setLastSlotId(long lastSlotId) { + this.lastSlotId = lastSlotId; + return this; + } + + LogSegmentMetadataBuilder setEnvelopeEntries(boolean envelopeEntries) { + this.envelopeEntries = envelopeEntries; + return this; + } + + LogSegmentMetadataBuilder setMinActiveEntryId(long minActiveEntryId) { + this.minActiveEntryId = minActiveEntryId; + return this; + } + + LogSegmentMetadataBuilder setMinActiveSlotId(long minActiveSlotId) { + this.minActiveSlotId = minActiveSlotId; + return this; + } + + LogSegmentMetadataBuilder setStartSequenceId(long startSequenceId) { + this.startSequenceId = startSequenceId; + return this; + } + + public LogSegmentMetadata build() { + return new LogSegmentMetadata( + zkPath, + version, + logSegmentId, + firstTxId, + lastTxId, + completionTime, + inprogress, + recordCount, + logSegmentSequenceNo, + lastEntryId, + lastSlotId, + regionId, + status, + minActiveEntryId, + minActiveSlotId, + startSequenceId, + envelopeEntries + ); + } + + } + + /** + * Mutator to mutate the metadata of a log segment. This mutator is going to create + * a new instance of the log segment metadata without changing the existing one. + */ + public static class Mutator extends LogSegmentMetadataBuilder { + + Mutator(LogSegmentMetadata original) { + super(original.getZkPath(), original.getVersion(), original.getLogSegmentId(), original.getFirstTxId()); + this.inprogress = original.isInProgress(); + this.logSegmentSequenceNo = original.getLogSegmentSequenceNumber(); + this.lastEntryId = original.getLastEntryId(); + this.lastSlotId = original.getLastSlotId(); + this.lastTxId = original.getLastTxId(); + this.completionTime = original.getCompletionTime(); + this.recordCount = original.getRecordCount(); + this.regionId = original.getRegionId(); + this.status = original.getStatus(); + this.minActiveEntryId = original.getMinActiveDLSN().getEntryId(); + this.minActiveSlotId = original.getMinActiveDLSN().getSlotId(); + this.startSequenceId = original.getStartSequenceId(); + this.envelopeEntries = original.getEnvelopeEntries(); + } + + @VisibleForTesting + public Mutator setVersion(LogSegmentMetadataVersion version) { + this.version = version; + return this; + } + + public Mutator setLogSegmentSequenceNumber(long seqNo) { + this.logSegmentSequenceNo = seqNo; + return this; + } + + public Mutator setZkPath(String zkPath) { + this.zkPath = zkPath; + return this; + } + + public Mutator setLastDLSN(DLSN dlsn) { + this.logSegmentSequenceNo = dlsn.getLogSegmentSequenceNo(); + this.lastEntryId = dlsn.getEntryId(); + this.lastSlotId = dlsn.getSlotId(); + return this; + } + + public Mutator setMinActiveDLSN(DLSN dlsn) { + if (this.logSegmentSequenceNo != dlsn.getLogSegmentSequenceNo()) { + throw new IllegalArgumentException("Updating minDLSN in an incorrect log segment"); + } + this.minActiveEntryId = dlsn.getEntryId(); + this.minActiveSlotId = dlsn.getSlotId(); + return this; + } + + public Mutator setTruncationStatus(TruncationStatus truncationStatus) { + status &= ~METADATA_TRUNCATION_STATUS_MASK; + status |= (truncationStatus.value & METADATA_TRUNCATION_STATUS_MASK); + return this; + } + + public Mutator setStartSequenceId(long startSequenceId) { + this.startSequenceId = startSequenceId; + return this; + } + } + + private final String zkPath; + private final long logSegmentId; + private final LogSegmentMetadataVersion version; + private final long firstTxId; + private final int regionId; + private final long status; + private final long lastTxId; + private final long completionTime; + private final int recordCount; + private final DLSN lastDLSN; + private final DLSN minActiveDLSN; + private final long startSequenceId; + private final boolean inprogress; + // This is a derived attribute. + // Since we overwrite the original version with the target version, information that is + // derived from the original version (e.g. does it support enveloping of entries) + // is lost while parsing. + // NOTE: This value is not stored in the Metadata store. + private final boolean envelopeEntries; + + public static final Comparator<LogSegmentMetadata> COMPARATOR + = new Comparator<LogSegmentMetadata>() { + + public int compare(LogSegmentMetadata o1, + LogSegmentMetadata o2) { + if ((o1.getLogSegmentSequenceNumber() == DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO) || + (o2.getLogSegmentSequenceNumber() == DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO)) { + if (o1.firstTxId < o2.firstTxId) { + return -1; + } else if (o1.firstTxId == o2.firstTxId) { + return 0; + } else { + return 1; + } + } else { + if (o1.getLogSegmentSequenceNumber() < o2.getLogSegmentSequenceNumber()) { + return -1; + } else if (o1.getLogSegmentSequenceNumber() == o2.getLogSegmentSequenceNumber()) { + // make sure we won't move over inprogress log segment if it still presents in the list + if (o1.isInProgress() && !o2.isInProgress()) { + return -1; + } else if (!o1.isInProgress() && o2.isInProgress()) { + return 1; + } else { + return 0; + } + } else { + return 1; + } + } + + + } + }; + + public static final Comparator<LogSegmentMetadata> DESC_COMPARATOR + = new Comparator<LogSegmentMetadata>() { + public int compare(LogSegmentMetadata o1, + LogSegmentMetadata o2) { + if ((o1.getLogSegmentSequenceNumber() == DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO) || + (o2.getLogSegmentSequenceNumber() == DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO)) { + if (o1.firstTxId > o2.firstTxId) { + return -1; + } else if (o1.firstTxId == o2.firstTxId) { + return 0; + } else { + return 1; + } + } else { + if (o1.getLogSegmentSequenceNumber() > o2.getLogSegmentSequenceNumber()) { + return -1; + } else if (o1.getLogSegmentSequenceNumber() == o2.getLogSegmentSequenceNumber()) { + // make sure we won't move over inprogress log segment if it still presents in the list + if (o1.isInProgress() && !o2.isInProgress()) { + return 1; + } else if (!o1.isInProgress() && o2.isInProgress()) { + return -1; + } else { + return 0; + } + } else { + return 1; + } + } + } + }; + + public static final int LEDGER_METADATA_CURRENT_LAYOUT_VERSION = + LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value; + + public static final int LEDGER_METADATA_OLDEST_SUPPORTED_VERSION = + LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value; + + static final int LOGRECORD_COUNT_SHIFT = 32; + static final long LOGRECORD_COUNT_MASK = 0xffffffff00000000L; + static final int REGION_SHIFT = 28; + static final long MAX_REGION_ID = 0xfL; + static final long REGION_MASK = 0x00000000f0000000L; + static final int STATUS_BITS_SHIFT = 8; + static final long STATUS_BITS_MASK = 0x000000000000ff00L; + static final long UNUSED_BITS_MASK = 0x000000000fff0000L; + static final long METADATA_VERSION_MASK = 0x00000000000000ffL; + + //Metadata status bits + static final long METADATA_TRUNCATION_STATUS_MASK = 0x3L; + static final long METADATA_STATUS_BIT_MAX = 0xffL; + + private LogSegmentMetadata(String zkPath, + LogSegmentMetadataVersion version, + long logSegmentId, + long firstTxId, + long lastTxId, + long completionTime, + boolean inprogress, + int recordCount, + long logSegmentSequenceNumber, + long lastEntryId, + long lastSlotId, + int regionId, + long status, + long minActiveEntryId, + long minActiveSlotId, + long startSequenceId, + boolean envelopeEntries) { + this.zkPath = zkPath; + this.logSegmentId = logSegmentId; + this.version = version; + this.firstTxId = firstTxId; + this.lastTxId = lastTxId; + this.inprogress = inprogress; + this.completionTime = completionTime; + this.recordCount = recordCount; + this.lastDLSN = new DLSN(logSegmentSequenceNumber, lastEntryId, lastSlotId); + this.minActiveDLSN = new DLSN(logSegmentSequenceNumber, minActiveEntryId, minActiveSlotId); + this.startSequenceId = startSequenceId; + this.regionId = regionId; + this.status = status; + this.envelopeEntries = envelopeEntries; + } + + public String getZkPath() { + return zkPath; + } + + public String getZNodeName() { + return new File(zkPath).getName(); + } + + public long getFirstTxId() { + return firstTxId; + } + + public long getLastTxId() { + return lastTxId; + } + + public long getCompletionTime() { + return completionTime; + } + + public long getLogSegmentId() { + return logSegmentId; + } + + public long getLogSegmentSequenceNumber() { + return lastDLSN.getLogSegmentSequenceNo(); + } + + public int getVersion() { + return version.value; + } + + public boolean getEnvelopeEntries() { + return envelopeEntries; + } + + public long getLastEntryId() { + return lastDLSN.getEntryId(); + } + + long getStatus() { + return status; + } + + public long getStartSequenceId() { + // generate negative sequence id for log segments that created <= v4 + return supportsSequenceId() && startSequenceId != DistributedLogConstants.UNASSIGNED_SEQUENCE_ID ? + startSequenceId : Long.MIN_VALUE + (getLogSegmentSequenceNumber() << 32L); + } + + public boolean isTruncated() { + return ((status & METADATA_TRUNCATION_STATUS_MASK) + == TruncationStatus.TRUNCATED.value); + } + + public boolean isPartiallyTruncated() { + return ((status & METADATA_TRUNCATION_STATUS_MASK) + == TruncationStatus.PARTIALLY_TRUNCATED.value); + } + + public boolean isNonTruncated() { + return ((status & METADATA_TRUNCATION_STATUS_MASK) + == TruncationStatus.ACTIVE.value); + } + + public long getLastSlotId() { + return lastDLSN.getSlotId(); + } + + public DLSN getLastDLSN() { + return lastDLSN; + } + + public DLSN getMinActiveDLSN() { + return minActiveDLSN; + } + + public DLSN getFirstDLSN() { + return new DLSN(getLogSegmentSequenceNumber(), 0, 0); + } + + public int getRecordCount() { + return recordCount; + } + + public int getRegionId() { + return regionId; + } + + public boolean isInProgress() { + return this.inprogress; + } + + @VisibleForTesting + public boolean isDLSNinThisSegment(DLSN dlsn) { + return dlsn.getLogSegmentSequenceNo() == getLogSegmentSequenceNumber(); + } + + @VisibleForTesting + public boolean isRecordPositionWithinSegmentScope(LogRecord record) { + return record.getLastPositionWithinLogSegment() <= getRecordCount(); + } + + @VisibleForTesting + public boolean isRecordLastPositioninThisSegment(LogRecord record) { + return record.getLastPositionWithinLogSegment() == getRecordCount(); + } + + /** + * complete current log segment. A new log segment metadata instance will be returned. + * + * @param zkPath + * zk path for the completed log segment. + * @param newLastTxId + * last tx id + * @param recordCount + * record count + * @param lastEntryId + * last entry id + * @param lastSlotId + * last slot id + * @return completed log segment. + */ + LogSegmentMetadata completeLogSegment(String zkPath, + long newLastTxId, + int recordCount, + long lastEntryId, + long lastSlotId, + long startSequenceId) { + assert this.lastTxId == DistributedLogConstants.INVALID_TXID; + + return new Mutator(this) + .setZkPath(zkPath) + .setLastDLSN(new DLSN(this.lastDLSN.getLogSegmentSequenceNo(), lastEntryId, lastSlotId)) + .setLastTxId(newLastTxId) + .setInprogress(false) + .setCompletionTime(Utils.nowInMillis()) + .setRecordCount(recordCount) + .setStartSequenceId(startSequenceId) + .build(); + } + + public static Future<LogSegmentMetadata> read(ZooKeeperClient zkc, String path) { + return read(zkc, path, false); + } + + public static Future<LogSegmentMetadata> read(ZooKeeperClient zkc, String path, final boolean skipMinVersionCheck) { + final Promise<LogSegmentMetadata> result = new Promise<LogSegmentMetadata>(); + try { + zkc.get().getData(path, false, new AsyncCallback.DataCallback() { + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + if (KeeperException.Code.OK.intValue() != rc) { + if (KeeperException.Code.NONODE.intValue() == rc) { + FutureUtils.setException(result, new LogSegmentNotFoundException(path)); + } else { + FutureUtils.setException(result, + new ZKException("Failed to read log segment metadata from " + path, + KeeperException.Code.get(rc))); + } + return; + } + try { + LogSegmentMetadata metadata = parseData(path, data, skipMinVersionCheck); + FutureUtils.setValue(result, metadata); + } catch (IOException ie) { + LOG.error("Error on parsing log segment metadata from {} : ", path, ie); + result.setException(ie); + } + } + }, null); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + result.setException(FutureUtils.zkException(e, path)); + } catch (InterruptedException e) { + result.setException(FutureUtils.zkException(e, path)); + } + return result; + } + + static LogSegmentMetadata parseDataV1(String path, byte[] data, String[] parts) + throws IOException { + long versionStatusCount = Long.parseLong(parts[0]); + + long version = versionStatusCount & METADATA_VERSION_MASK; + assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); + assert (1 == version); + + LogSegmentMetadataVersion llmv = LogSegmentMetadataVersion.VERSION_V1_ORIGINAL; + + int regionId = (int)(versionStatusCount & REGION_MASK) >> REGION_SHIFT; + assert (regionId >= 0 && regionId <= 0xf); + + long status = (versionStatusCount & STATUS_BITS_MASK) >> STATUS_BITS_SHIFT; + assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); + + if (parts.length == 3) { + long logSegmentId = Long.parseLong(parts[1]); + long txId = Long.parseLong(parts[2]); + return new LogSegmentMetadataBuilder(path, llmv, logSegmentId, txId) + .setRegionId(regionId) + .setStatus(status) + .build(); + } else if (parts.length == 5) { + long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; + assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); + + long logSegmentId = Long.parseLong(parts[1]); + long firstTxId = Long.parseLong(parts[2]); + long lastTxId = Long.parseLong(parts[3]); + long completionTime = Long.parseLong(parts[4]); + return new LogSegmentMetadataBuilder(path, llmv, logSegmentId, firstTxId) + .setInprogress(false) + .setLastTxId(lastTxId) + .setCompletionTime(completionTime) + .setRecordCount((int) recordCount) + .setRegionId(regionId) + .setStatus(status) + .build(); + } else { + throw new IOException("Invalid log segment metadata : " + + new String(data, UTF_8)); + } + } + + static LogSegmentMetadata parseDataV2(String path, byte[] data, String[] parts) + throws IOException { + long versionStatusCount = Long.parseLong(parts[0]); + + long version = versionStatusCount & METADATA_VERSION_MASK; + assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); + assert (2 == version); + + LogSegmentMetadataVersion llmv = LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO; + + int regionId = (int)((versionStatusCount & REGION_MASK) >> REGION_SHIFT); + assert (regionId >= 0 && regionId <= 0xf); + + long status = (versionStatusCount & STATUS_BITS_MASK) >> STATUS_BITS_SHIFT; + assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); + + if (parts.length == 4) { + long logSegmentId = Long.parseLong(parts[1]); + long txId = Long.parseLong(parts[2]); + long logSegmentSequenceNumber = Long.parseLong(parts[3]); + return new LogSegmentMetadataBuilder(path, llmv, logSegmentId, txId) + .setLogSegmentSequenceNo(logSegmentSequenceNumber) + .setRegionId(regionId) + .setStatus(status) + .build(); + } else if (parts.length == 8) { + long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; + assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); + + long logSegmentId = Long.parseLong(parts[1]); + long firstTxId = Long.parseLong(parts[2]); + long lastTxId = Long.parseLong(parts[3]); + long completionTime = Long.parseLong(parts[4]); + long logSegmentSequenceNumber = Long.parseLong(parts[5]); + long lastEntryId = Long.parseLong(parts[6]); + long lastSlotId = Long.parseLong(parts[7]); + return new LogSegmentMetadataBuilder(path, llmv, logSegmentId, firstTxId) + .setInprogress(false) + .setLastTxId(lastTxId) + .setCompletionTime(completionTime) + .setRecordCount((int) recordCount) + .setLogSegmentSequenceNo(logSegmentSequenceNumber) + .setLastEntryId(lastEntryId) + .setLastSlotId(lastSlotId) + .setRegionId(regionId) + .setStatus(status) + .build(); + } else { + throw new IOException("Invalid logsegment metadata : " + + new String(data, UTF_8)); + } + + } + + static LogSegmentMetadata parseDataVersionsWithMinActiveDLSN(String path, byte[] data, String[] parts) + throws IOException { + long versionStatusCount = Long.parseLong(parts[0]); + + long version = versionStatusCount & METADATA_VERSION_MASK; + assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); + assert (LogSegmentMetadataVersion.VERSION_V3_MIN_ACTIVE_DLSN.value <= version && + LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value >= version); + + LogSegmentMetadataVersion llmv = LogSegmentMetadataVersion.of((int) version); + + int regionId = (int)((versionStatusCount & REGION_MASK) >> REGION_SHIFT); + assert (regionId >= 0 && regionId <= 0xf); + + long status = (versionStatusCount & STATUS_BITS_MASK) >> STATUS_BITS_SHIFT; + assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); + + if (parts.length == 6) { + long logSegmentId = Long.parseLong(parts[1]); + long txId = Long.parseLong(parts[2]); + long logSegmentSequenceNumber = Long.parseLong(parts[3]); + long minActiveEntryId = Long.parseLong(parts[4]); + long minActiveSlotId = Long.parseLong(parts[5]); + + LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, logSegmentId, txId) + .setLogSegmentSequenceNo(logSegmentSequenceNumber) + .setMinActiveEntryId(minActiveEntryId) + .setMinActiveSlotId(minActiveSlotId) + .setRegionId(regionId) + .setStatus(status); + if (supportsEnvelopedEntries((int) version)) { + builder = builder.setEnvelopeEntries(true); + } + return builder.build(); + } else if (parts.length == 10) { + long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; + assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); + + long logSegmentId = Long.parseLong(parts[1]); + long firstTxId = Long.parseLong(parts[2]); + long lastTxId = Long.parseLong(parts[3]); + long completionTime = Long.parseLong(parts[4]); + long logSegmentSequenceNumber = Long.parseLong(parts[5]); + long lastEntryId = Long.parseLong(parts[6]); + long lastSlotId = Long.parseLong(parts[7]); + long minActiveEntryId = Long.parseLong(parts[8]); + long minActiveSlotId = Long.parseLong(parts[9]); + LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, logSegmentId, firstTxId) + .setInprogress(false) + .setLastTxId(lastTxId) + .setCompletionTime(completionTime) + .setRecordCount((int) recordCount) + .setLogSegmentSequenceNo(logSegmentSequenceNumber) + .setLastEntryId(lastEntryId) + .setLastSlotId(lastSlotId) + .setMinActiveEntryId(minActiveEntryId) + .setMinActiveSlotId(minActiveSlotId) + .setRegionId(regionId) + .setStatus(status); + if (supportsEnvelopedEntries((int) version)) { + builder = builder.setEnvelopeEntries(true); + } + return builder.build(); + } else { + throw new IOException("Invalid logsegment metadata : " + + new String(data, UTF_8)); + } + + } + + static LogSegmentMetadata parseDataVersionsWithSequenceId(String path, byte[] data, String[] parts) + throws IOException { + long versionStatusCount = Long.parseLong(parts[0]); + + long version = versionStatusCount & METADATA_VERSION_MASK; + assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); + assert (LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value <= version && + LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION >= version); + + LogSegmentMetadataVersion llmv = LogSegmentMetadataVersion.of((int) version); + + int regionId = (int)((versionStatusCount & REGION_MASK) >> REGION_SHIFT); + assert (regionId >= 0 && regionId <= 0xf); + + long status = (versionStatusCount & STATUS_BITS_MASK) >> STATUS_BITS_SHIFT; + assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); + + if (parts.length == 7) { + long logSegmentId = Long.parseLong(parts[1]); + long txId = Long.parseLong(parts[2]); + long logSegmentSequenceNumber = Long.parseLong(parts[3]); + long minActiveEntryId = Long.parseLong(parts[4]); + long minActiveSlotId = Long.parseLong(parts[5]); + long startSequenceId = Long.parseLong(parts[6]); + + LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, logSegmentId, txId) + .setLogSegmentSequenceNo(logSegmentSequenceNumber) + .setMinActiveEntryId(minActiveEntryId) + .setMinActiveSlotId(minActiveSlotId) + .setRegionId(regionId) + .setStatus(status) + .setStartSequenceId(startSequenceId) + .setEnvelopeEntries(true); + return builder.build(); + } else if (parts.length == 11) { + long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; + assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); + + long logSegmentId = Long.parseLong(parts[1]); + long firstTxId = Long.parseLong(parts[2]); + long lastTxId = Long.parseLong(parts[3]); + long completionTime = Long.parseLong(parts[4]); + long logSegmentSequenceNumber = Long.parseLong(parts[5]); + long lastEntryId = Long.parseLong(parts[6]); + long lastSlotId = Long.parseLong(parts[7]); + long minActiveEntryId = Long.parseLong(parts[8]); + long minActiveSlotId = Long.parseLong(parts[9]); + long startSequenceId = Long.parseLong(parts[10]); + LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, logSegmentId, firstTxId) + .setInprogress(false) + .setLastTxId(lastTxId) + .setCompletionTime(completionTime) + .setRecordCount((int) recordCount) + .setLogSegmentSequenceNo(logSegmentSequenceNumber) + .setLastEntryId(lastEntryId) + .setLastSlotId(lastSlotId) + .setMinActiveEntryId(minActiveEntryId) + .setMinActiveSlotId(minActiveSlotId) + .setRegionId(regionId) + .setStatus(status) + .setStartSequenceId(startSequenceId) + .setEnvelopeEntries(true); + return builder.build(); + } else { + throw new IOException("Invalid log segment metadata : " + + new String(data, UTF_8)); + } + } + + public static LogSegmentMetadata parseData(String path, byte[] data) + throws IOException { + return parseData(path, data, false); + } + + static LogSegmentMetadata parseData(String path, byte[] data, boolean skipMinVersionCheck) throws IOException { + String[] parts = new String(data, UTF_8).split(";"); + long version; + try { + version = Long.parseLong(parts[0]) & METADATA_VERSION_MASK; + } catch (Exception exc) { + throw new IOException("Invalid ledger entry, " + + new String(data, UTF_8)); + } + + if (!skipMinVersionCheck && version < LogSegmentMetadata.LEDGER_METADATA_OLDEST_SUPPORTED_VERSION) { + throw new UnsupportedMetadataVersionException("Ledger metadata version '" + version + "' is no longer supported: " + + new String(data, UTF_8)); + } + + if (version > LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION) { + throw new UnsupportedMetadataVersionException("Metadata version '" + version + "' is higher than the highest supported version : " + + new String(data, UTF_8)); + } + + if (LogSegmentMetadataVersion.VERSION_V1_ORIGINAL.value == version) { + return parseDataV1(path, data, parts); + } else if (LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value == version) { + return parseDataV2(path, data, parts); + } else if (LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value >= version && + LogSegmentMetadataVersion.VERSION_V3_MIN_ACTIVE_DLSN.value <= version) { + return parseDataVersionsWithMinActiveDLSN(path, data, parts); + } else { + assert(version >= LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value); + return parseDataVersionsWithSequenceId(path, data, parts); + } + } + + public String getFinalisedData() { + return getFinalisedData(this.version); + } + + public String getFinalisedData(LogSegmentMetadataVersion version) { + String finalisedData; + final long logSegmentSeqNo = getLogSegmentSequenceNumber(); + final long lastEntryId = getLastEntryId(); + final long lastSlotId = getLastSlotId(); + final long minActiveEntryId = minActiveDLSN.getEntryId(); + final long minActiveSlotId = minActiveDLSN.getSlotId(); + + if (LogSegmentMetadataVersion.VERSION_V1_ORIGINAL == version) { + if (inprogress) { + finalisedData = String.format("%d;%d;%d", + version.value, logSegmentId, firstTxId); + } else { + long versionAndCount = ((long) version.value) | ((long)recordCount << LOGRECORD_COUNT_SHIFT); + finalisedData = String.format("%d;%d;%d;%d;%d", + versionAndCount, logSegmentId, firstTxId, lastTxId, completionTime); + } + } else { + long versionStatusCount = ((long) version.value); + versionStatusCount |= ((status & METADATA_STATUS_BIT_MAX) << STATUS_BITS_SHIFT); + versionStatusCount |= (((long) regionId & MAX_REGION_ID) << REGION_SHIFT); + if (!inprogress) { + versionStatusCount |= ((long)recordCount << LOGRECORD_COUNT_SHIFT); + } + if (LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO == version) { + if (inprogress) { + finalisedData = String.format("%d;%d;%d;%d", + versionStatusCount, logSegmentId, firstTxId, logSegmentSeqNo); + } else { + finalisedData = String.format("%d;%d;%d;%d;%d;%d;%d;%d", + versionStatusCount, logSegmentId, firstTxId, lastTxId, completionTime, + logSegmentSeqNo, lastEntryId, lastSlotId); + } + } else if (LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value >= version.value && + LogSegmentMetadataVersion.VERSION_V3_MIN_ACTIVE_DLSN.value <= version.value) { + if (inprogress) { + finalisedData = String.format("%d;%d;%d;%d;%d;%d", + versionStatusCount, logSegmentId, firstTxId, logSegmentSeqNo, minActiveEntryId, minActiveSlotId); + } else { + finalisedData = String.format("%d;%d;%d;%d;%d;%d;%d;%d;%d;%d", + versionStatusCount, logSegmentId, firstTxId, lastTxId, completionTime, + logSegmentSeqNo, lastEntryId, lastSlotId, minActiveEntryId, minActiveSlotId); + } + } else if (LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value <= version.value && + LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION >= version.value) { + if (inprogress) { + finalisedData = String.format("%d;%d;%d;%d;%d;%d;%d", + versionStatusCount, logSegmentId, firstTxId, logSegmentSeqNo, minActiveEntryId, minActiveSlotId, startSequenceId); + } else { + finalisedData = String.format("%d;%d;%d;%d;%d;%d;%d;%d;%d;%d;%d", + versionStatusCount, logSegmentId, firstTxId, lastTxId, completionTime, + logSegmentSeqNo, lastEntryId, lastSlotId, minActiveEntryId, minActiveSlotId, startSequenceId); + } + } else { + throw new IllegalStateException("Unsupported log segment ledger metadata version '" + version + "'"); + } + } + return finalisedData; + } + + String getSegmentName() { + String[] parts = this.zkPath.split("/"); + if (parts.length <= 0) { + throw new IllegalStateException("ZK Path is not valid"); + } + return parts[parts.length - 1]; + } + + public void write(ZooKeeperClient zkc) + throws IOException, KeeperException.NodeExistsException { + String finalisedData = getFinalisedData(version); + try { + zkc.get().create(zkPath, finalisedData.getBytes(UTF_8), + zkc.getDefaultACL(), CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nee) { + throw nee; + } catch (InterruptedException ie) { + throw new DLInterruptedException("Interrupted on creating ledger znode " + zkPath, ie); + } catch (Exception e) { + LOG.error("Error creating ledger znode {}", zkPath, e); + throw new IOException("Error creating ledger znode " + zkPath); + } + } + + boolean checkEquivalence(ZooKeeperClient zkc, String path) { + try { + LogSegmentMetadata other = FutureUtils.result(read(zkc, path)); + if (LOG.isTraceEnabled()) { + LOG.trace("Verifying {} against {}", this, other); + } + + boolean retVal; + + // All fields may not be comparable so only compare the ones + // that can be compared + // completionTime is set when a node is finalized, so that + // cannot be compared + // if the node is inprogress, don't compare the lastTxId either + if (this.getLogSegmentSequenceNumber() != other.getLogSegmentSequenceNumber() || + this.logSegmentId != other.logSegmentId || + this.firstTxId != other.firstTxId) { + retVal = false; + } else if (this.inprogress) { + retVal = other.inprogress; + } else { + retVal = (!other.inprogress && (this.lastTxId == other.lastTxId)); + } + + if (!retVal) { + LOG.warn("Equivalence check failed between {} and {}", this, other); + } + + return retVal; + } catch (Exception e) { + LOG.error("Could not check equivalence between:" + this + " and data in " + path, e); + return false; + } + } + + public boolean equals(Object o) { + if (!(o instanceof LogSegmentMetadata)) { + return false; + } + LogSegmentMetadata ol = (LogSegmentMetadata) o; + return getLogSegmentSequenceNumber() == ol.getLogSegmentSequenceNumber() + && logSegmentId == ol.logSegmentId + && firstTxId == ol.firstTxId + && lastTxId == ol.lastTxId + && version == ol.version + && completionTime == ol.completionTime + && Objects.equal(lastDLSN, ol.lastDLSN) + && Objects.equal(minActiveDLSN, ol.minActiveDLSN) + && startSequenceId == ol.startSequenceId + && status == ol.status; + } + + public int hashCode() { + int hash = 1; + hash = hash * 31 + (int) logSegmentId; + hash = hash * 31 + (int) firstTxId; + hash = hash * 31 + (int) lastTxId; + hash = hash * 31 + version.value; + hash = hash * 31 + (int) completionTime; + hash = hash * 31 + (int) getLogSegmentSequenceNumber(); + return hash; + } + + public String toString() { + return "[LogSegmentId:" + logSegmentId + + ", firstTxId:" + firstTxId + + ", lastTxId:" + lastTxId + + ", version:" + version + + ", completionTime:" + completionTime + + ", recordCount:" + recordCount + + ", regionId:" + regionId + + ", status:" + status + + ", logSegmentSequenceNumber:" + getLogSegmentSequenceNumber() + + ", lastEntryId:" + getLastEntryId() + + ", lastSlotId:" + getLastSlotId() + + ", inprogress:" + inprogress + + ", minActiveDLSN:" + minActiveDLSN + + ", startSequenceId:" + startSequenceId + + "]"; + } + + public Mutator mutator() { + return new Mutator(this); + } + + + // + // Version Checking Utilities + // + + public boolean supportsLogSegmentSequenceNo() { + return supportsLogSegmentSequenceNo(version.value); + } + + /** + * Whether the provided version supports log segment sequence number. + * + * @param version + * log segment metadata version + * @return true if this log segment supports log segment sequence number. + */ + public static boolean supportsLogSegmentSequenceNo(int version) { + return version >= LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value; + } + + /** + * Whether the provided version supports enveloping entries before writing to bookkeeper. + * + * @param version + * log segment metadata version + * @return true if this log segment supports enveloping entries + */ + public static boolean supportsEnvelopedEntries(int version) { + return version >= LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value; + } + + public boolean supportsSequenceId() { + return supportsSequenceId(version.value); + } + + /** + * Whether the provided version supports sequence id. + * + * @param version + * log segment metadata version + * @return true if the log segment support sequence id. + */ + public static boolean supportsSequenceId(int version) { + return version >= LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java new file mode 100644 index 0000000..8a4a30b --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.io.Abortable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/* +* A generic interface class to support writing log records into +* a persistent distributed log. +*/ +public interface LogWriter extends Closeable, Abortable { + /** + * Write a log record to the stream. + * + * @param record single log record + * @throws IOException + */ + public void write(LogRecord record) throws IOException; + + + /** + * Write a list of log records to the stream. + * + * @param records list of log records + * @throws IOException + */ + @Deprecated + public int writeBulk(List<LogRecord> records) throws IOException; + + /** + * All data that has been written to the stream so far will be sent to + * persistent storage. + * The transmission is asynchronous and new data can be still written to the + * stream while flushing is performed. + * + * TODO: rename this to flush() + */ + public long setReadyToFlush() throws IOException; + + /** + * Flush and sync all data that is ready to be flush + * {@link #setReadyToFlush()} into underlying persistent store. + * @throws IOException + * + * TODO: rename this to commit() + */ + public long flushAndSync() throws IOException; + + /** + * Flushes all the data up to this point, + * adds the end of stream marker and marks the stream + * as read-only in the metadata. No appends to the + * stream will be allowed after this point + * + * @throws IOException + */ + public void markEndOfStream() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/MaxLogSegmentSequenceNo.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/MaxLogSegmentSequenceNo.java b/distributedlog-core/src/main/java/org/apache/distributedlog/MaxLogSegmentSequenceNo.java new file mode 100644 index 0000000..a76f547 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/MaxLogSegmentSequenceNo.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.util.DLUtils; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; + +/** + * Utility class for storing and reading max ledger sequence number + */ +class MaxLogSegmentSequenceNo { + + Version version; + long maxSeqNo; + + MaxLogSegmentSequenceNo(Versioned<byte[]> logSegmentsData) { + if (null != logSegmentsData + && null != logSegmentsData.getValue() + && null != logSegmentsData.getVersion()) { + version = logSegmentsData.getVersion(); + try { + maxSeqNo = DLUtils.deserializeLogSegmentSequenceNumber(logSegmentsData.getValue()); + } catch (NumberFormatException nfe) { + maxSeqNo = DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO; + } + } else { + maxSeqNo = DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO; + if (null != logSegmentsData && null != logSegmentsData.getVersion()) { + version = logSegmentsData.getVersion(); + } else { + throw new IllegalStateException("Invalid MaxLogSegmentSequenceNo found - " + logSegmentsData); + } + } + } + + synchronized Version getVersion() { + return version; + } + + synchronized long getSequenceNumber() { + return maxSeqNo; + } + + synchronized MaxLogSegmentSequenceNo update(Version version, long logSegmentSeqNo) { + if (version.compare(this.version) == Version.Occurred.AFTER) { + this.version = version; + this.maxSeqNo = logSegmentSeqNo; + } + return this; + } + + public synchronized Versioned<Long> getVersionedData(long seqNo) { + return new Versioned<Long>(seqNo, version); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/MaxTxId.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/org/apache/distributedlog/MaxTxId.java new file mode 100644 index 0000000..8f077e2 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/MaxTxId.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.util.DLUtils; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for storing and reading + * the max seen txid in zookeeper + */ +class MaxTxId { + static final Logger LOG = LoggerFactory.getLogger(MaxTxId.class); + + private Version version; + private long currentMax; + + MaxTxId(Versioned<byte[]> maxTxIdData) { + if (null != maxTxIdData + && null != maxTxIdData.getValue() + && null != maxTxIdData.getVersion()) { + this.version = maxTxIdData.getVersion(); + try { + this.currentMax = DLUtils.deserializeTransactionId(maxTxIdData.getValue()); + } catch (NumberFormatException e) { + LOG.warn("Invalid txn id stored in {}", e); + this.currentMax = DistributedLogConstants.INVALID_TXID; + } + } else { + this.currentMax = DistributedLogConstants.INVALID_TXID; + if (null != maxTxIdData && null != maxTxIdData.getVersion()) { + this.version = maxTxIdData.getVersion(); + } else { + throw new IllegalStateException("Invalid MaxTxId found - " + maxTxIdData); + } + } + } + + synchronized void update(Version version, long txId) { + if (version.compare(this.version) == Version.Occurred.AFTER) { + this.version = version; + this.currentMax = txId; + } + } + + synchronized long get() { + return currentMax; + } + + public synchronized Versioned<Long> getVersionedData(long txId) { + return new Versioned<Long>(Math.max(txId, get()), version); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java new file mode 100644 index 0000000..3d1d601 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.io.AsyncCloseable; + +import java.io.Closeable; +import java.io.IOException; + +public interface MetadataAccessor extends Closeable, AsyncCloseable { + /** + * Get the name of the stream managed by this log manager + * @return streamName + */ + public String getStreamName(); + + public void createOrUpdateMetadata(byte[] metadata) throws IOException; + + public void deleteMetadata() throws IOException; + + public byte[] getMetadata() throws IOException; + + /** + * Close the distributed log metadata, freeing any resources it may hold. + */ + public void close() throws IOException; + +}