http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java new file mode 100644 index 0000000..f951991 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableList; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.ACL; + +import static com.google.common.base.Charsets.UTF_8; + +public class DistributedLogConstants { + public static final byte[] EMPTY_BYTES = new byte[0]; + public static final String SCHEME_PREFIX = "distributedlog"; + public static final String BACKEND_BK = "bk"; + public static final long INVALID_TXID = -999; + public static final long EMPTY_LOGSEGMENT_TX_ID = -99; + public static final long MAX_TXID = Long.MAX_VALUE; + public static final long SMALL_LOGSEGMENT_THRESHOLD = 10; + public static final int LOGSEGMENT_NAME_VERSION = 1; + public static final int FUTURE_TIMEOUT_IMMEDIATE = 0; + public static final int FUTURE_TIMEOUT_INFINITE = -1; + public static final long LOCK_IMMEDIATE = FUTURE_TIMEOUT_IMMEDIATE; + public static final long LOCK_TIMEOUT_INFINITE = FUTURE_TIMEOUT_INFINITE; + public static final long LOCK_OP_TIMEOUT_DEFAULT = 120; + public static final long LOCK_REACQUIRE_TIMEOUT_DEFAULT = 120; + public static final String UNKNOWN_CLIENT_ID = "Unknown-ClientId"; + public static final int LOCAL_REGION_ID = 0; + public static final long LOGSEGMENT_DEFAULT_STATUS = 0; + public static final long UNASSIGNED_LOGSEGMENT_SEQNO = 0; + public static final long UNASSIGNED_SEQUENCE_ID = -1L; + public static final long FIRST_LOGSEGMENT_SEQNO = 1; + public static final long UNRESOLVED_LEDGER_ID = -1; + public static final long LATENCY_WARN_THRESHOLD_IN_MILLIS = TimeUnit.SECONDS.toMillis(1); + public static final int DL_INTERRUPTED_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 1; + public static final int ZK_CONNECTION_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 2; + + public static final String ALLOCATION_POOL_NODE = ".allocation_pool"; + // log segment prefix + public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress"; + public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs"; + public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement"; + static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); + static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8); + + // An ACL that gives all permissions to node creators and read permissions only to everyone else. + public static final List<ACL> EVERYONE_READ_CREATOR_ALL = + ImmutableList.<ACL>builder() + .addAll(Ids.CREATOR_ALL_ACL) + .addAll(Ids.READ_ACL_UNSAFE) + .build(); +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java new file mode 100644 index 0000000..7d33e9c --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.callback.LogSegmentListener; +import org.apache.distributedlog.io.AsyncCloseable; +import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.distributedlog.subscription.SubscriptionStateStore; +import org.apache.distributedlog.subscription.SubscriptionsStore; +import com.twitter.util.Future; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * A DistributedLogManager is responsible for managing a single place of storing + * edit logs. It may correspond to multiple files, a backup node, etc. + * Even when the actual underlying storage is rolled, or failed and restored, + * each conceptual place of storage corresponds to exactly one instance of + * this class, which is created when the EditLog is first opened. + */ +public interface DistributedLogManager extends AsyncCloseable, Closeable { + + /** + * Get the name of the stream managed by this log manager + * @return streamName + */ + public String getStreamName(); + + /** + * Get the namespace driver used by this manager. + * + * @return the namespace driver + */ + public NamespaceDriver getNamespaceDriver(); + + /** + * Get log segments. + * + * @return log segments + * @throws IOException + */ + public List<LogSegmentMetadata> getLogSegments() throws IOException; + + /** + * Register <i>listener</i> on log segment updates of this stream. + * + * @param listener + * listener to receive update log segment list. + */ + public void registerListener(LogSegmentListener listener) throws IOException ; + + /** + * Unregister <i>listener</i> on log segment updates from this stream. + * + * @param listener + * listener to receive update log segment list. + */ + public void unregisterListener(LogSegmentListener listener); + + /** + * Open async log writer to write records to the log stream. + * + * @return result represents the open result + */ + public Future<AsyncLogWriter> openAsyncLogWriter(); + + /** + * Begin writing to the log stream identified by the name + * + * @return the writer interface to generate log records + */ + public LogWriter startLogSegmentNonPartitioned() throws IOException; + + /** + * Begin writing to the log stream identified by the name + * + * @return the writer interface to generate log records + */ + // @Deprecated + public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException; + + /** + * Begin appending to the end of the log stream which is being treated as a sequence of bytes + * + * @return the writer interface to generate log records + */ + public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException; + + /** + * Get a reader to read a log stream as a sequence of bytes + * + * @return the writer interface to generate log records + */ + public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException; + + /** + * Get the input stream starting with fromTxnId for the specified log + * + * @param fromTxnId - the first transaction id we want to read + * @return the stream starting with transaction fromTxnId + * @throws IOException if a stream cannot be found. + */ + public LogReader getInputStream(long fromTxnId) + throws IOException; + + public LogReader getInputStream(DLSN fromDLSN) throws IOException; + + /** + * Open an async log reader to read records from a log starting from <code>fromTxnId</code>. + * + * @param fromTxnId + * transaction id to start reading from + * @return async log reader + */ + public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId); + + /** + * Open an async log reader to read records from a log starting from <code>fromDLSN</code> + * + * @param fromDLSN + * dlsn to start reading from + * @return async log reader + */ + public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN); + + // @Deprecated + public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException; + + // @Deprecated + public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException; + + public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN); + + /** + * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>. + * If two readers tried to open using same subscriberId, one would succeed, while the other + * will be blocked until it gets the lock. + * + * @param fromDLSN + * start dlsn + * @param subscriberId + * subscriber id + * @return async log reader + */ + public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId); + + /** + * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from + * its last commit position recorded in subscription store. If no last commit position found + * in subscription store, it would start reading from head of the stream. + * + * If the two readers tried to open using same subscriberId, one would succeed, while the other + * will be blocked until it gets the lock. + * + * @param subscriberId + * subscriber id + * @return async log reader + */ + public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId); + + /** + * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>. + * + * @param transactionId + * transaction id + * @return dlsn of first log record whose transaction id is not less than transactionId. + */ + public Future<DLSN> getDLSNNotLessThanTxId(long transactionId); + + /** + * Get the last log record in the stream + * + * @return the last log record in the stream + * @throws IOException if a stream cannot be found. + */ + public LogRecordWithDLSN getLastLogRecord() + throws IOException; + + /** + * Get the earliest Transaction Id available in the log + * + * @return earliest transaction id + * @throws IOException + */ + public long getFirstTxId() throws IOException; + + /** + * Get Latest Transaction Id in the log + * + * @return latest transaction id + * @throws IOException + */ + public long getLastTxId() throws IOException; + + /** + * Get Latest DLSN in the log + * + * @return last dlsn + * @throws IOException + */ + public DLSN getLastDLSN() throws IOException; + + /** + * Get Latest log record with DLSN in the log - async + * + * @return latest log record with DLSN + */ + public Future<LogRecordWithDLSN> getLastLogRecordAsync(); + + /** + * Get Latest Transaction Id in the log - async + * + * @return latest transaction id + */ + public Future<Long> getLastTxIdAsync(); + + /** + * Get first DLSN in the log. + * + * @return first dlsn in the stream + */ + public Future<DLSN> getFirstDLSNAsync(); + + /** + * Get Latest DLSN in the log - async + * + * @return latest transaction id + */ + public Future<DLSN> getLastDLSNAsync(); + + /** + * Get the number of log records in the active portion of the log + * Any log segments that have already been truncated will not be included + * + * @return number of log records + * @throws IOException + */ + public long getLogRecordCount() throws IOException; + + /** + * Get the number of log records in the active portion of the log - async. + * Any log segments that have already been truncated will not be included + * + * @return future number of log records + * @throws IOException + */ + public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN); + + /** + * Run recovery on the log. + * + * @throws IOException + */ + public void recover() throws IOException; + + /** + * Check if an end of stream marker was added to the stream + * A stream with an end of stream marker cannot be appended to + * + * @return true if the marker was added to the stream, false otherwise + * @throws IOException + */ + public boolean isEndOfStreamMarked() throws IOException; + + /** + * Delete the log. + * + * @throws IOException if the deletion fails + */ + public void delete() throws IOException; + + /** + * The DistributedLogManager may archive/purge any logs for transactionId + * less than or equal to minImageTxId. + * This is to be used only when the client explicitly manages deletion. If + * the cleanup policy is based on sliding time window, then this method need + * not be called. + * + * @param minTxIdToKeep the earliest txid that must be retained + * @throws IOException if purging fails + */ + public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException; + + /** + * Get the subscriptions store provided by the distributedlog manager. + * + * @return subscriptions store manages subscriptions for current stream. + */ + public SubscriptionsStore getSubscriptionsStore(); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java new file mode 100644 index 0000000..617282c --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java @@ -0,0 +1,403 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import org.apache.distributedlog.exceptions.LogRecordTooLongException; +import org.apache.distributedlog.exceptions.WriteException; +import org.apache.distributedlog.io.CompressionCodec; +import com.twitter.util.Promise; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; + +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * A set of {@link LogRecord}s. + */ +public class Entry { + + /** + * Create a new log record set. + * + * @param logName + * name of the log + * @param initialBufferSize + * initial buffer size + * @param envelopeBeforeTransmit + * if envelope the buffer before transmit + * @param codec + * compression codec + * @param statsLogger + * stats logger to receive stats + * @return writer to build a log record set. + */ + public static Writer newEntry( + String logName, + int initialBufferSize, + boolean envelopeBeforeTransmit, + CompressionCodec.Type codec, + StatsLogger statsLogger) { + return new EnvelopedEntryWriter( + logName, + initialBufferSize, + envelopeBeforeTransmit, + codec, + statsLogger); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Build the record set object. + */ + public static class Builder { + + private long logSegmentSequenceNumber = -1; + private long entryId = -1; + private long startSequenceId = Long.MIN_VALUE; + private boolean envelopeEntry = true; + // input stream + private InputStream in = null; + // or bytes array + private byte[] data = null; + private int offset = -1; + private int length = -1; + private Optional<Long> txidToSkipTo = Optional.absent(); + private Optional<DLSN> dlsnToSkipTo = Optional.absent(); + private boolean deserializeRecordSet = true; + + private Builder() {} + + /** + * Reset the builder. + * + * @return builder + */ + public Builder reset() { + logSegmentSequenceNumber = -1; + entryId = -1; + startSequenceId = Long.MIN_VALUE; + envelopeEntry = true; + // input stream + in = null; + // or bytes array + data = null; + offset = -1; + length = -1; + txidToSkipTo = Optional.absent(); + dlsnToSkipTo = Optional.absent(); + return this; + } + + /** + * Set the segment info of the log segment that this record + * set belongs to. + * + * @param lssn + * log segment sequence number + * @param startSequenceId + * start sequence id of this log segment + * @return builder + */ + public Builder setLogSegmentInfo(long lssn, long startSequenceId) { + this.logSegmentSequenceNumber = lssn; + this.startSequenceId = startSequenceId; + return this; + } + + /** + * Set the entry id of this log record set. + * + * @param entryId + * entry id assigned for this log record set. + * @return builder + */ + public Builder setEntryId(long entryId) { + this.entryId = entryId; + return this; + } + + /** + * Set whether this record set is enveloped or not. + * + * @param enabled + * flag indicates whether this record set is enveloped or not. + * @return builder + */ + public Builder setEnvelopeEntry(boolean enabled) { + this.envelopeEntry = enabled; + return this; + } + + /** + * Set the serialized bytes data of this record set. + * + * @param data + * serialized bytes data of this record set. + * @param offset + * offset of the bytes data + * @param length + * length of the bytes data + * @return builder + */ + public Builder setData(byte[] data, int offset, int length) { + this.data = data; + this.offset = offset; + this.length = length; + return this; + } + + /** + * Set the input stream of the serialized bytes data of this record set. + * + * @param in + * input stream + * @return builder + */ + public Builder setInputStream(InputStream in) { + this.in = in; + return this; + } + + /** + * Set the record set starts from <code>dlsn</code>. + * + * @param dlsn + * dlsn to skip to + * @return builder + */ + public Builder skipTo(@Nullable DLSN dlsn) { + this.dlsnToSkipTo = Optional.fromNullable(dlsn); + return this; + } + + /** + * Set the record set starts from <code>txid</code>. + * + * @param txid + * txid to skip to + * @return builder + */ + public Builder skipTo(long txid) { + this.txidToSkipTo = Optional.of(txid); + return this; + } + + /** + * Enable/disable deserialize record set. + * + * @param enabled + * flag to enable/disable dserialize record set. + * @return builder + */ + public Builder deserializeRecordSet(boolean enabled) { + this.deserializeRecordSet = enabled; + return this; + } + + public Entry build() { + Preconditions.checkNotNull(data, "Serialized data isn't provided"); + Preconditions.checkArgument(offset >= 0 && length >= 0 + && (offset + length) <= data.length, + "Invalid offset or length of serialized data"); + return new Entry( + logSegmentSequenceNumber, + entryId, + startSequenceId, + envelopeEntry, + deserializeRecordSet, + data, + offset, + length, + txidToSkipTo, + dlsnToSkipTo); + } + + public Entry.Reader buildReader() throws IOException { + Preconditions.checkArgument(data != null || in != null, + "Serialized data or input stream isn't provided"); + InputStream in; + if (null != this.in) { + in = this.in; + } else { + Preconditions.checkArgument(offset >= 0 && length >= 0 + && (offset + length) <= data.length, + "Invalid offset or length of serialized data"); + in = new ByteArrayInputStream(data, offset, length); + } + return new EnvelopedEntryReader( + logSegmentSequenceNumber, + entryId, + startSequenceId, + in, + envelopeEntry, + deserializeRecordSet, + NullStatsLogger.INSTANCE); + } + + } + + private final long logSegmentSequenceNumber; + private final long entryId; + private final long startSequenceId; + private final boolean envelopedEntry; + private final boolean deserializeRecordSet; + private final byte[] data; + private final int offset; + private final int length; + private final Optional<Long> txidToSkipTo; + private final Optional<DLSN> dlsnToSkipTo; + + private Entry(long logSegmentSequenceNumber, + long entryId, + long startSequenceId, + boolean envelopedEntry, + boolean deserializeRecordSet, + byte[] data, + int offset, + int length, + Optional<Long> txidToSkipTo, + Optional<DLSN> dlsnToSkipTo) { + this.logSegmentSequenceNumber = logSegmentSequenceNumber; + this.entryId = entryId; + this.startSequenceId = startSequenceId; + this.envelopedEntry = envelopedEntry; + this.deserializeRecordSet = deserializeRecordSet; + this.data = data; + this.offset = offset; + this.length = length; + this.txidToSkipTo = txidToSkipTo; + this.dlsnToSkipTo = dlsnToSkipTo; + } + + /** + * Get raw data of this record set. + * + * @return raw data representation of this record set. + */ + public byte[] getRawData() { + return data; + } + + /** + * Create reader to iterate over this record set. + * + * @return reader to iterate over this record set. + * @throws IOException if the record set is invalid record set. + */ + public Reader reader() throws IOException { + InputStream in = new ByteArrayInputStream(data, offset, length); + Reader reader = new EnvelopedEntryReader( + logSegmentSequenceNumber, + entryId, + startSequenceId, + in, + envelopedEntry, + deserializeRecordSet, + NullStatsLogger.INSTANCE); + if (txidToSkipTo.isPresent()) { + reader.skipTo(txidToSkipTo.get()); + } + if (dlsnToSkipTo.isPresent()) { + reader.skipTo(dlsnToSkipTo.get()); + } + return reader; + } + + /** + * Writer to append {@link LogRecord}s to {@link Entry}. + */ + public interface Writer extends EntryBuffer { + + /** + * Write a {@link LogRecord} to this record set. + * + * @param record + * record to write + * @param transmitPromise + * callback for transmit result. the promise is only + * satisfied when this record set is transmitted. + * @throws LogRecordTooLongException if the record is too long + * @throws WriteException when encountered exception writing the record + */ + void writeRecord(LogRecord record, Promise<DLSN> transmitPromise) + throws LogRecordTooLongException, WriteException; + + /** + * Reset the writer to write records. + */ + void reset(); + + } + + /** + * Reader to read {@link LogRecord}s from this record set. + */ + public interface Reader { + + /** + * Get the log segment sequence number. + * + * @return the log segment sequence number. + */ + long getLSSN(); + + /** + * Return the entry id. + * + * @return the entry id. + */ + long getEntryId(); + + /** + * Read next log record from this record set. + * + * @return next log record from this record set. + */ + LogRecordWithDLSN nextRecord() throws IOException; + + /** + * Skip the reader to the record whose transaction id is <code>txId</code>. + * + * @param txId + * transaction id to skip to. + * @return true if skip succeeds, otherwise false. + * @throws IOException + */ + boolean skipTo(long txId) throws IOException; + + /** + * Skip the reader to the record whose DLSN is <code>dlsn</code>. + * + * @param dlsn + * DLSN to skip to. + * @return true if skip succeeds, otherwise false. + * @throws IOException + */ + boolean skipTo(DLSN dlsn) throws IOException; + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java new file mode 100644 index 0000000..c695420 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException; +import org.apache.distributedlog.io.Buffer; +import org.apache.distributedlog.io.TransmitListener; + +import java.io.IOException; + +/** + * Write representation of a {@link Entry}. + * It is a buffer of log record set, used for transmission. + */ +public interface EntryBuffer extends TransmitListener { + + /** + * Return if this record set contains user records. + * + * @return true if this record set contains user records, otherwise + * return false. + */ + boolean hasUserRecords(); + + /** + * Return number of records in current record set. + * + * @return number of records in current record set. + */ + int getNumRecords(); + + /** + * Return number of bytes in current record set. + * + * @return number of bytes in current record set. + */ + int getNumBytes(); + + /** + * Return max tx id in current record set. + * + * @return max tx id. + */ + long getMaxTxId(); + + /** + * Get the buffer to transmit. + * + * @return the buffer to transmit. + * @throws InvalidEnvelopedEntryException if the record set buffer is invalid + * @throws IOException when encountered IOException during serialization + */ + Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java new file mode 100644 index 0000000..218662c --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +/** + * The position of an entry, identified by log segment sequence number and entry id. + */ +class EntryPosition { + + private long lssn; + private long entryId; + + EntryPosition(long lssn, long entryId) { + this.lssn = lssn; + this.entryId = entryId; + } + + public synchronized long getLogSegmentSequenceNumber() { + return lssn; + } + + public synchronized long getEntryId() { + return entryId; + } + + public synchronized boolean advance(long lssn, long entryId) { + if (lssn == this.lssn) { + if (entryId <= this.entryId) { + return false; + } + this.entryId = entryId; + return true; + } else if (lssn > this.lssn) { + this.lssn = lssn; + this.entryId = entryId; + return true; + } else { + return false; + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("(").append(lssn).append(", ").append(entryId).append(")"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java new file mode 100644 index 0000000..eb1e9af --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import com.google.common.base.Preconditions; + +import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; + +import org.apache.distributedlog.annotations.DistributedLogAnnotations.Compression; +import org.apache.distributedlog.io.CompressionCodec; +import org.apache.distributedlog.io.CompressionUtils; +import org.apache.distributedlog.util.BitMaskUtils; + +/** + * An enveloped entry written to BookKeeper. + * + * Data type in brackets. Interpretation should be on the basis of data types and not individual + * bytes to honor Endianness. + * + * Entry Structure: + * --------------- + * Bytes 0 : Version (Byte) + * Bytes 1 - (DATA = 1+Header.length-1) : Header (Integer) + * Bytes DATA - DATA+3 : Payload Length (Integer) + * BYTES DATA+4 - DATA+4+payload.length-1 : Payload (Byte[]) + * + * V1 Header Structure: // Offsets relative to the start of the header. + * ------------------- + * Bytes 0 - 3 : Flags (Integer) + * Bytes 4 - 7 : Original payload size before compression (Integer) + * + * Flags: // 32 Bits + * ----- + * 0 ... 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 + * |_| + * | + * Compression Type + * + * Compression Type: // 2 Bits (Least significant) + * ---------------- + * 00 : No Compression + * 01 : LZ4 Compression + * 10 : Unused + * 11 : Unused + */ +public class EnvelopedEntry { + + public static final int VERSION_LENGTH = 1; // One byte long + public static final byte VERSION_ONE = 1; + + public static final byte LOWEST_SUPPORTED_VERSION = VERSION_ONE; + public static final byte HIGHEST_SUPPORTED_VERSION = VERSION_ONE; + public static final byte CURRENT_VERSION = VERSION_ONE; + + private final OpStatsLogger compressionStat; + private final OpStatsLogger decompressionStat; + private final Counter compressedEntryBytes; + private final Counter decompressedEntryBytes; + private final byte version; + + private Header header = new Header(); + private Payload payloadCompressed = new Payload(); + private Payload payloadDecompressed = new Payload(); + + public EnvelopedEntry(byte version, + StatsLogger statsLogger) throws InvalidEnvelopedEntryException { + Preconditions.checkNotNull(statsLogger); + if (version < LOWEST_SUPPORTED_VERSION || version > HIGHEST_SUPPORTED_VERSION) { + throw new InvalidEnvelopedEntryException("Invalid enveloped entry version " + version + ", expected to be in [ " + + LOWEST_SUPPORTED_VERSION + " ~ " + HIGHEST_SUPPORTED_VERSION + " ]"); + } + this.version = version; + this.compressionStat = statsLogger.getOpStatsLogger("compression_time"); + this.decompressionStat = statsLogger.getOpStatsLogger("decompression_time"); + this.compressedEntryBytes = statsLogger.getCounter("compressed_bytes"); + this.decompressedEntryBytes = statsLogger.getCounter("decompressed_bytes"); + } + + /** + * @param statsLogger + * Used for getting stats for (de)compression time + * @param compressionType + * The compression type to use + * @param decompressed + * The decompressed payload + * NOTE: The size of the byte array passed as the decompressed payload can be larger + * than the actual contents to be compressed. + */ + public EnvelopedEntry(byte version, + CompressionCodec.Type compressionType, + byte[] decompressed, + int length, + StatsLogger statsLogger) + throws InvalidEnvelopedEntryException { + this(version, statsLogger); + Preconditions.checkNotNull(compressionType); + Preconditions.checkNotNull(decompressed); + Preconditions.checkArgument(length >= 0, "Invalid bytes length " + length); + + this.header = new Header(compressionType, length); + this.payloadDecompressed = new Payload(length, decompressed); + } + + private boolean isReady() { + return (header.ready && payloadDecompressed.ready); + } + + @Compression + public void writeFully(DataOutputStream out) throws IOException { + Preconditions.checkNotNull(out); + if (!isReady()) { + throw new IOException("Entry not writable"); + } + // Version + out.writeByte(version); + // Header + header.write(out); + // Compress + CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType); + byte[] compressed = codec.compress( + payloadDecompressed.payload, + 0, + payloadDecompressed.length, + compressionStat); + this.payloadCompressed = new Payload(compressed.length, compressed); + this.compressedEntryBytes.add(payloadCompressed.length); + this.decompressedEntryBytes.add(payloadDecompressed.length); + payloadCompressed.write(out); + } + + @Compression + public void readFully(DataInputStream in) throws IOException { + Preconditions.checkNotNull(in); + // Make sure we're reading the right versioned entry. + byte version = in.readByte(); + if (version != this.version) { + throw new IOException(String.format("Version mismatch while reading. Received: %d," + + " Required: %d", version, this.version)); + } + header.read(in); + payloadCompressed.read(in); + // Decompress + CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType); + byte[] decompressed = codec.decompress( + payloadCompressed.payload, + 0, + payloadCompressed.length, + header.decompressedSize, + decompressionStat); + this.payloadDecompressed = new Payload(decompressed.length, decompressed); + this.compressedEntryBytes.add(payloadCompressed.length); + this.decompressedEntryBytes.add(payloadDecompressed.length); + } + + public byte[] getDecompressedPayload() throws IOException { + if (!isReady()) { + throw new IOException("Decompressed payload is not initialized"); + } + return payloadDecompressed.payload; + } + + public static class Header { + public static final int COMPRESSION_CODEC_MASK = 0x3; + public static final int COMPRESSION_CODEC_NONE = 0x0; + public static final int COMPRESSION_CODEC_LZ4 = 0x1; + + private int flags = 0; + private int decompressedSize = 0; + private CompressionCodec.Type compressionType = CompressionCodec.Type.UNKNOWN; + + // Whether this struct is ready for reading/writing. + private boolean ready = false; + + // Used while reading. + public Header() { + } + + public Header(CompressionCodec.Type compressionType, + int decompressedSize) { + this.compressionType = compressionType; + this.decompressedSize = decompressedSize; + this.flags = 0; + switch (compressionType) { + case NONE: + this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK, + COMPRESSION_CODEC_NONE); + break; + case LZ4: + this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK, + COMPRESSION_CODEC_LZ4); + break; + default: + throw new RuntimeException(String.format("Unknown Compression Type: %s", + compressionType)); + } + // This can now be written. + this.ready = true; + } + + private void write(DataOutputStream out) throws IOException { + out.writeInt(flags); + out.writeInt(decompressedSize); + } + + private void read(DataInputStream in) throws IOException { + this.flags = in.readInt(); + int compressionType = (int) BitMaskUtils.get(flags, COMPRESSION_CODEC_MASK); + if (compressionType == COMPRESSION_CODEC_NONE) { + this.compressionType = CompressionCodec.Type.NONE; + } else if (compressionType == COMPRESSION_CODEC_LZ4) { + this.compressionType = CompressionCodec.Type.LZ4; + } else { + throw new IOException(String.format("Unsupported Compression Type: %s", + compressionType)); + } + this.decompressedSize = in.readInt(); + // Values can now be read. + this.ready = true; + } + } + + public static class Payload { + private int length = 0; + private byte[] payload = null; + + // Whether this struct is ready for reading/writing. + private boolean ready = false; + + // Used for reading + Payload() { + } + + Payload(int length, byte[] payload) { + this.length = length; + this.payload = payload; + this.ready = true; + } + + private void write(DataOutputStream out) throws IOException { + out.writeInt(length); + out.write(payload, 0, length); + } + + private void read(DataInputStream in) throws IOException { + this.length = in.readInt(); + this.payload = new byte[length]; + in.readFully(payload); + this.ready = true; + } + } + + /** + * Return an InputStream that reads from the provided InputStream, decompresses the data + * and returns a new InputStream wrapping the underlying payload. + * + * Note that src is modified by this call. + * + * @return + * New Input stream with the underlying payload. + * @throws Exception + */ + public static InputStream fromInputStream(InputStream src, + StatsLogger statsLogger) throws IOException { + src.mark(VERSION_LENGTH); + byte version = new DataInputStream(src).readByte(); + src.reset(); + EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger); + entry.readFully(new DataInputStream(src)); + return new ByteArrayInputStream(entry.getDecompressedPayload()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java new file mode 100644 index 0000000..1761de5 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.bookkeeper.stats.StatsLogger; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Record reader to read records from an enveloped entry buffer. + */ +class EnvelopedEntryReader implements Entry.Reader, RecordStream { + + private final long logSegmentSeqNo; + private final long entryId; + private final LogRecord.Reader reader; + + // slot id + private long slotId = 0; + + EnvelopedEntryReader(long logSegmentSeqNo, + long entryId, + long startSequenceId, + InputStream in, + boolean envelopedEntry, + boolean deserializeRecordSet, + StatsLogger statsLogger) + throws IOException { + this.logSegmentSeqNo = logSegmentSeqNo; + this.entryId = entryId; + InputStream src = in; + if (envelopedEntry) { + src = EnvelopedEntry.fromInputStream(in, statsLogger); + } + this.reader = new LogRecord.Reader( + this, + new DataInputStream(src), + startSequenceId, + deserializeRecordSet); + } + + @Override + public long getLSSN() { + return logSegmentSeqNo; + } + + @Override + public long getEntryId() { + return entryId; + } + + @Override + public LogRecordWithDLSN nextRecord() throws IOException { + return reader.readOp(); + } + + @Override + public boolean skipTo(long txId) throws IOException { + return reader.skipTo(txId, true); + } + + @Override + public boolean skipTo(DLSN dlsn) throws IOException { + return reader.skipTo(dlsn); + } + + // + // Record Stream + // + + @Override + public void advance(int numRecords) { + slotId += numRecords; + } + + @Override + public DLSN getCurrentPosition() { + return new DLSN(logSegmentSeqNo, entryId, slotId); + } + + @Override + public String getName() { + return "EnvelopedReader"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java new file mode 100644 index 0000000..54858d7 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.Entry.Writer; +import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException; +import org.apache.distributedlog.exceptions.LogRecordTooLongException; +import org.apache.distributedlog.exceptions.WriteCancelledException; +import org.apache.distributedlog.exceptions.WriteException; +import org.apache.distributedlog.io.Buffer; +import org.apache.distributedlog.io.CompressionCodec; +import com.twitter.util.Promise; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; + +/** + * {@link org.apache.distributedlog.io.Buffer} based log record set writer. + */ +class EnvelopedEntryWriter implements Writer { + + static final Logger logger = LoggerFactory.getLogger(EnvelopedEntryWriter.class); + + private static class WriteRequest { + + private final int numRecords; + private final Promise<DLSN> promise; + + WriteRequest(int numRecords, Promise<DLSN> promise) { + this.numRecords = numRecords; + this.promise = promise; + } + + } + + private final String logName; + private final Buffer buffer; + private final LogRecord.Writer writer; + private final List<WriteRequest> writeRequests; + private final boolean envelopeBeforeTransmit; + private final CompressionCodec.Type codec; + private final StatsLogger statsLogger; + private int count = 0; + private boolean hasUserData = false; + private long maxTxId = Long.MIN_VALUE; + + EnvelopedEntryWriter(String logName, + int initialBufferSize, + boolean envelopeBeforeTransmit, + CompressionCodec.Type codec, + StatsLogger statsLogger) { + this.logName = logName; + this.buffer = new Buffer(initialBufferSize * 6 / 5); + this.writer = new LogRecord.Writer(new DataOutputStream(buffer)); + this.writeRequests = new LinkedList<WriteRequest>(); + this.envelopeBeforeTransmit = envelopeBeforeTransmit; + this.codec = codec; + this.statsLogger = statsLogger; + } + + @Override + public synchronized void reset() { + cancelPromises(new WriteCancelledException(logName, "Record Set is reset")); + count = 0; + this.buffer.reset(); + } + + @Override + public synchronized void writeRecord(LogRecord record, + Promise<DLSN> transmitPromise) + throws LogRecordTooLongException, WriteException { + int logRecordSize = record.getPersistentSize(); + if (logRecordSize > MAX_LOGRECORD_SIZE) { + throw new LogRecordTooLongException( + "Log Record of size " + logRecordSize + " written when only " + + MAX_LOGRECORD_SIZE + " is allowed"); + } + + try { + this.writer.writeOp(record); + int numRecords = 1; + if (!record.isControl()) { + hasUserData = true; + } + if (record.isRecordSet()) { + numRecords = LogRecordSet.numRecords(record); + } + count += numRecords; + writeRequests.add(new WriteRequest(numRecords, transmitPromise)); + maxTxId = Math.max(maxTxId, record.getTransactionId()); + } catch (IOException e) { + logger.error("Failed to append record to record set of {} : ", + logName, e); + throw new WriteException(logName, "Failed to append record to record set of " + + logName); + } + } + + private synchronized void satisfyPromises(long lssn, long entryId) { + long nextSlotId = 0; + for (WriteRequest request : writeRequests) { + request.promise.setValue(new DLSN(lssn, entryId, nextSlotId)); + nextSlotId += request.numRecords; + } + writeRequests.clear(); + } + + private synchronized void cancelPromises(Throwable reason) { + for (WriteRequest request : writeRequests) { + request.promise.setException(reason); + } + writeRequests.clear(); + } + + @Override + public synchronized long getMaxTxId() { + return maxTxId; + } + + @Override + public synchronized boolean hasUserRecords() { + return hasUserData; + } + + @Override + public int getNumBytes() { + return buffer.size(); + } + + @Override + public synchronized int getNumRecords() { + return count; + } + + @Override + public synchronized Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException { + if (!envelopeBeforeTransmit) { + return buffer; + } + // We can't escape this allocation because things need to be read from one byte array + // and then written to another. This is the destination. + Buffer toSend = new Buffer(buffer.size()); + byte[] decompressed = buffer.getData(); + int length = buffer.size(); + EnvelopedEntry entry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION, + codec, + decompressed, + length, + statsLogger); + // This will cause an allocation of a byte[] for compression. This can be avoided + // but we can do that later only if needed. + entry.writeFully(new DataOutputStream(toSend)); + return toSend; + } + + @Override + public synchronized DLSN finalizeTransmit(long lssn, long entryId) { + return new DLSN(lssn, entryId, count - 1); + } + + @Override + public void completeTransmit(long lssn, long entryId) { + satisfyPromises(lssn, entryId); + } + + @Override + public void abortTransmit(Throwable reason) { + cancelPromises(reason); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java new file mode 100644 index 0000000..f94495f --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import java.io.Serializable; +import java.util.Comparator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LedgerReadPosition { + static final Logger LOG = LoggerFactory.getLogger(LedgerReadPosition.class); + + private static enum PartialOrderingComparisonResult { + NotComparable, + GreaterThan, + LessThan, + EqualTo + } + + long ledgerId = DistributedLogConstants.UNRESOLVED_LEDGER_ID; + long logSegmentSequenceNo; + long entryId; + + public LedgerReadPosition(long ledgerId, long logSegmentSequenceNo, long entryId) { + this.ledgerId = ledgerId; + this.logSegmentSequenceNo = logSegmentSequenceNo; + this.entryId = entryId; + } + + public LedgerReadPosition(LedgerReadPosition that) { + this.ledgerId = that.ledgerId; + this.logSegmentSequenceNo = that.logSegmentSequenceNo; + this.entryId = that.entryId; + } + + + public LedgerReadPosition(final DLSN dlsn) { + this(dlsn.getLogSegmentSequenceNo(), dlsn.getEntryId()); + } + + public LedgerReadPosition(long logSegmentSequenceNo, long entryId) { + this.logSegmentSequenceNo = logSegmentSequenceNo; + this.entryId = entryId; + } + + public long getLedgerId() { + if (DistributedLogConstants.UNRESOLVED_LEDGER_ID == ledgerId) { + LOG.trace("Ledger Id is not initialized"); + throw new IllegalStateException("Ledger Id is not initialized"); + } + return ledgerId; + } + + public long getLogSegmentSequenceNumber() { + return logSegmentSequenceNo; + } + + public long getEntryId() { + return entryId; + } + + public void advance() { + entryId++; + } + + public void positionOnNewLogSegment(long ledgerId, long logSegmentSequenceNo) { + this.ledgerId = ledgerId; + this.logSegmentSequenceNo = logSegmentSequenceNo; + this.entryId = 0L; + } + + @Override + public String toString() { + return String.format("(lid=%d, lseqNo=%d, eid=%d)", ledgerId, logSegmentSequenceNo, entryId); + } + + public boolean definitelyLessThanOrEqualTo(LedgerReadPosition threshold) { + PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold); + return ((result == PartialOrderingComparisonResult.LessThan) || + (result == PartialOrderingComparisonResult.EqualTo)); + } + + public boolean definitelyLessThan(LedgerReadPosition threshold) { + PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold); + return result == PartialOrderingComparisonResult.LessThan; + } + + private PartialOrderingComparisonResult comparePartiallyOrdered(LedgerReadPosition threshold) { + // If no threshold is passed we cannot make a definitive comparison + if (null == threshold) { + return PartialOrderingComparisonResult.NotComparable; + } + + if (this.logSegmentSequenceNo != threshold.logSegmentSequenceNo) { + if (this.logSegmentSequenceNo < threshold.logSegmentSequenceNo) { + return PartialOrderingComparisonResult.LessThan; + } else { + return PartialOrderingComparisonResult.GreaterThan; + } + } else if (this.ledgerId != threshold.ledgerId) { + // When logSegmentSequenceNo is equal we cannot definitely say that this + // position is less than the threshold unless ledgerIds are equal + // since LogSegmentSequenceNumber maybe inferred from transactionIds in older + // versions of the metadata. + return PartialOrderingComparisonResult.NotComparable; + } else if (this.getEntryId() < threshold.getEntryId()) { + return PartialOrderingComparisonResult.LessThan; + } else if (this.getEntryId() > threshold.getEntryId()) { + return PartialOrderingComparisonResult.GreaterThan; + } else { + return PartialOrderingComparisonResult.EqualTo; + } + } + + /** + * Comparator for the key portion + */ + public static final ReadAheadCacheKeyComparator COMPARATOR = new ReadAheadCacheKeyComparator(); + + // Only compares the key portion + @Override + public boolean equals(Object other) { + if (!(other instanceof LedgerReadPosition)) { + return false; + } + LedgerReadPosition key = (LedgerReadPosition) other; + return ledgerId == key.ledgerId && + entryId == key.entryId; + } + + @Override + public int hashCode() { + return (int) (ledgerId * 13 ^ entryId * 17); + } + + /** + * Compare EntryKey. + */ + protected static class ReadAheadCacheKeyComparator implements Comparator<LedgerReadPosition>, Serializable { + + private static final long serialVersionUID = 0L; + + @Override + public int compare(LedgerReadPosition left, LedgerReadPosition right) { + long ret = left.ledgerId - right.ledgerId; + if (ret == 0) { + ret = left.entryId - right.entryId; + } + return (ret < 0) ? -1 : ((ret > 0) ? 1 : 0); + } + } + +} + + http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java new file mode 100644 index 0000000..5623525 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Optional; +import org.apache.distributedlog.impl.metadata.BKDLConfig; +import org.apache.distributedlog.metadata.DLMetadata; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim; +import org.apache.bookkeeper.util.IOUtils; +import org.apache.bookkeeper.util.LocalBookKeeper; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.BindException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Utility class for setting up bookkeeper ensembles + * and bringing individual bookies up and down + */ +public class LocalDLMEmulator { + private static final Logger LOG = LoggerFactory.getLogger(LocalDLMEmulator.class); + + public static final String DLOG_NAMESPACE = "/messaging/distributedlog"; + + private static final int DEFAULT_BOOKIE_INITIAL_PORT = 0; // Use ephemeral ports + private static final int DEFAULT_ZK_TIMEOUT_SEC = 10; + private static final int DEFAULT_ZK_PORT = 2181; + private static final String DEFAULT_ZK_HOST = "127.0.0.1"; + private static final String DEFAULT_ZK_ENSEMBLE = DEFAULT_ZK_HOST + ":" + DEFAULT_ZK_PORT; + private static final int DEFAULT_NUM_BOOKIES = 3; + private static final ServerConfiguration DEFAULT_SERVER_CONFIGURATION = new ServerConfiguration(); + + private final String zkEnsemble; + private final URI uri; + private final List<File> tmpDirs = new ArrayList<File>(); + private final int zkTimeoutSec; + private final Thread bkStartupThread; + private final String zkHost; + private final int zkPort; + private final int numBookies; + + public static class Builder { + private int zkTimeoutSec = DEFAULT_ZK_TIMEOUT_SEC; + private int numBookies = DEFAULT_NUM_BOOKIES; + private String zkHost = DEFAULT_ZK_HOST; + private int zkPort = DEFAULT_ZK_PORT; + private int initialBookiePort = DEFAULT_BOOKIE_INITIAL_PORT; + private boolean shouldStartZK = true; + private Optional<ServerConfiguration> serverConf = Optional.absent(); + + public Builder numBookies(int numBookies) { + this.numBookies = numBookies; + return this; + } + public Builder zkHost(String zkHost) { + this.zkHost = zkHost; + return this; + } + public Builder zkPort(int zkPort) { + this.zkPort = zkPort; + return this; + } + public Builder zkTimeoutSec(int zkTimeoutSec) { + this.zkTimeoutSec = zkTimeoutSec; + return this; + } + public Builder initialBookiePort(int initialBookiePort) { + this.initialBookiePort = initialBookiePort; + return this; + } + public Builder shouldStartZK(boolean shouldStartZK) { + this.shouldStartZK = shouldStartZK; + return this; + } + public Builder serverConf(ServerConfiguration serverConf) { + this.serverConf = Optional.of(serverConf); + return this; + } + + public LocalDLMEmulator build() throws Exception { + ServerConfiguration conf = null; + if (serverConf.isPresent()) { + conf = serverConf.get(); + } else { + conf = (ServerConfiguration) DEFAULT_SERVER_CONFIGURATION.clone(); + conf.setZkTimeout(zkTimeoutSec * 1000); + } + ServerConfiguration newConf = new ServerConfiguration(); + newConf.loadConf(conf); + newConf.setAllowLoopback(true); + + return new LocalDLMEmulator(numBookies, shouldStartZK, zkHost, zkPort, + initialBookiePort, zkTimeoutSec, newConf); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort, final int zkTimeoutSec, final ServerConfiguration serverConf) throws Exception { + this.numBookies = numBookies; + this.zkHost = zkHost; + this.zkPort = zkPort; + this.zkEnsemble = zkHost + ":" + zkPort; + this.uri = URI.create("distributedlog://" + zkEnsemble + DLOG_NAMESPACE); + this.zkTimeoutSec = zkTimeoutSec; + this.bkStartupThread = new Thread() { + public void run() { + try { + LOG.info("Starting {} bookies : allowLoopback = {}", numBookies, serverConf.getAllowLoopback()); + LocalBookKeeper.startLocalBookies(zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, serverConf); + LOG.info("{} bookies are started."); + } catch (InterruptedException e) { + // go away quietly + } catch (Exception e) { + LOG.error("Error starting local bk", e); + } + } + }; + } + + public void start() throws Exception { + bkStartupThread.start(); + if (!LocalBookKeeper.waitForServerUp(zkEnsemble, zkTimeoutSec*1000)) { + throw new Exception("Error starting zookeeper/bookkeeper"); + } + int bookiesUp = checkBookiesUp(numBookies, zkTimeoutSec); + assert (numBookies == bookiesUp); + // Provision "/messaging/distributedlog" namespace + DLMetadata.create(new BKDLConfig(zkEnsemble, "/ledgers")).create(uri); + } + + public void teardown() throws Exception { + if (bkStartupThread != null) { + bkStartupThread.interrupt(); + bkStartupThread.join(); + } + for (File dir : tmpDirs) { + FileUtils.deleteDirectory(dir); + } + } + + public String getZkServers() { + return zkEnsemble; + } + + public URI getUri() { + return uri; + } + + public BookieServer newBookie() throws Exception { + ServerConfiguration bookieConf = new ServerConfiguration(); + bookieConf.setZkTimeout(zkTimeoutSec * 1000); + bookieConf.setBookiePort(0); + bookieConf.setAllowLoopback(true); + File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_", + "test"); + if (!tmpdir.delete()) { + LOG.debug("Fail to delete tmpdir " + tmpdir); + } + if (!tmpdir.mkdir()) { + throw new IOException("Fail to create tmpdir " + tmpdir); + } + tmpDirs.add(tmpdir); + + bookieConf.setZkServers(zkEnsemble); + bookieConf.setJournalDirName(tmpdir.getPath()); + bookieConf.setLedgerDirNames(new String[]{tmpdir.getPath()}); + + BookieServer b = new BookieServer(bookieConf); + b.start(); + for (int i = 0; i < 10 && !b.isRunning(); i++) { + Thread.sleep(10000); + } + if (!b.isRunning()) { + throw new IOException("Bookie would not start"); + } + return b; + } + + /** + * Check that a number of bookies are available + * + * @param count number of bookies required + * @param timeout number of seconds to wait for bookies to start + * @throws java.io.IOException if bookies are not started by the time the timeout hits + */ + public int checkBookiesUp(int count, int timeout) throws Exception { + ZooKeeper zkc = connectZooKeeper(zkHost, zkPort, zkTimeoutSec); + try { + int mostRecentSize = 0; + for (int i = 0; i < timeout; i++) { + try { + List<String> children = zkc.getChildren("/ledgers/available", + false); + children.remove("readonly"); + mostRecentSize = children.size(); + if ((mostRecentSize > count) || LOG.isDebugEnabled()) { + LOG.info("Found " + mostRecentSize + " bookies up, " + + "waiting for " + count); + if ((mostRecentSize > count) || LOG.isTraceEnabled()) { + for (String child : children) { + LOG.info(" server: " + child); + } + } + } + if (mostRecentSize == count) { + break; + } + } catch (KeeperException e) { + // ignore + } + Thread.sleep(1000); + } + return mostRecentSize; + } finally { + zkc.close(); + } + } + + public static String getBkLedgerPath() { + return "/ledgers"; + } + + public static ZooKeeper connectZooKeeper(String zkHost, int zkPort) + throws IOException, KeeperException, InterruptedException { + return connectZooKeeper(zkHost, zkPort, DEFAULT_ZK_TIMEOUT_SEC); + } + + public static ZooKeeper connectZooKeeper(String zkHost, int zkPort, int zkTimeoutSec) + throws IOException, KeeperException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final String zkHostPort = zkHost + ":" + zkPort; + + ZooKeeper zkc = new ZooKeeper(zkHostPort, zkTimeoutSec * 1000, new Watcher() { + public void process(WatchedEvent event) { + if (event.getState() == Event.KeeperState.SyncConnected) { + latch.countDown(); + } + } + }); + if (!latch.await(zkTimeoutSec, TimeUnit.SECONDS)) { + throw new IOException("Zookeeper took too long to connect"); + } + return zkc; + } + + public static URI createDLMURI(String path) throws Exception { + return createDLMURI(DEFAULT_ZK_ENSEMBLE, path); + } + + public static URI createDLMURI(String zkServers, String path) throws Exception { + return URI.create("distributedlog://" + zkServers + DLOG_NAMESPACE + path); + } + + /** + * Try to start zookkeeper locally on any port. + */ + public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(File zkDir) throws Exception { + return runZookeeperOnAnyPort((int) (Math.random()*10000+7000), zkDir); + } + + /** + * Try to start zookkeeper locally on any port beginning with some base port. + * Dump some socket info when bind fails. + */ + public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(int basePort, File zkDir) throws Exception { + + final int MAX_RETRIES = 20; + final int MIN_PORT = 1025; + final int MAX_PORT = 65535; + ZooKeeperServerShim zks = null; + int zkPort = basePort; + boolean success = false; + int retries = 0; + + while (!success) { + try { + LOG.info("zk trying to bind to port " + zkPort); + zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkDir); + success = true; + } catch (BindException be) { + retries++; + if (retries > MAX_RETRIES) { + throw be; + } + zkPort++; + if (zkPort > MAX_PORT) { + zkPort = MIN_PORT; + } + } + } + + return Pair.of(zks, zkPort); + } + + public static void main(String[] args) throws Exception { + try { + if (args.length < 1) { + System.out.println("Usage: LocalDLEmulator <zk_port>"); + System.exit(-1); + } + + final int zkPort = Integer.parseInt(args[0]); + final File zkDir = IOUtils.createTempDir("distrlog", "zookeeper"); + final LocalDLMEmulator localDlm = LocalDLMEmulator.newBuilder() + .zkPort(zkPort) + .build(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + localDlm.teardown(); + FileUtils.deleteDirectory(zkDir); + System.out.println("ByeBye!"); + } catch (Exception e) { + // do nothing + } + } + }); + localDlm.start(); + + System.out.println(String.format( + "DistributedLog Sandbox is running now. You could access distributedlog://%s:%s", + DEFAULT_ZK_HOST, + zkPort)); + } catch (Exception ex) { + System.out.println("Exception occurred running emulator " + ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java new file mode 100644 index 0000000..75a32ef --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.io.AsyncCloseable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * <i>LogReader</i> is a `synchronous` reader reading records from a DL log. + * + * <h3>Lifecycle of a Reader</h3> + * + * A reader is a <i>sequential</i> reader that read records from a DL log starting + * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)} + * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}. + * <p> + * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)} + * to read records out the log from provided position. + * <p> + * Closing the reader (via {@link #close()} will release all the resources occupied + * by this reader instance. + * <p> + * Exceptions could be thrown during reading records. Once the exception is thrown, + * the reader is set to an error state and it isn't usable anymore. It is the application's + * responsibility to handle the exceptions and re-create readers if necessary. + * <p> + * Example: + * <pre> + * DistributedLogManager dlm = ...; + * long nextTxId = ...; + * LogReader reader = dlm.getInputStream(nextTxId); + * + * while (true) { // keep reading & processing records + * LogRecord record; + * try { + * record = reader.readNext(false); + * nextTxId = record.getTransactionId(); + * // process the record + * ... + * } catch (IOException ioe) { + * // handle the exception + * ... + * reader = dlm.getInputStream(nextTxId + 1); + * } + * } + * + * </pre> + * + * <h3>Read Records</h3> + * + * Reading records from an <i>endless</i> log in `synchronous` way isn't as + * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it + * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on + * controlling the <i>waiting</i> behavior on `synchronous` reads. + * + * <h4>Blocking vs NonBlocking</h4> + * + * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records + * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true) + * means the reads will only check readahead cache and return whatever records + * available in the readahead cache. + * <p> + * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is + * catching up with writer (there are records in the log), the read call will + * wait until records are read and returned. If the reader is caught up with + * writer (there are no more records in the log at read time), the read call + * will wait for a small period of time (defined in + * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever + * records available in the readahead cache. In other words, if a reader sees + * no record on blocking reads, it means the reader is `caught-up` with the + * writer. + * <p> + * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated + * state machines. Applications could use <i>blocking</i> reads till caught up + * with latest data. Once they are caught up with latest data, they could start + * serving their service and turn to <i>non-blocking</i> read mode and tail read + * data from the logs. + * <p> + * See examples below. + * + * <h4>Read Single Record</h4> + * + * {@link #readNext(boolean)} is reading individual records from a DL log. + * + * <pre> + * LogReader reader = ... + * + * // keep reading records in blocking way until no records available in the log + * LogRecord record = reader.readNext(false); + * while (null != record) { + * // process the record + * ... + * // read next record + * records = reader.readNext(false); + * } + * + * ... + * + * // reader is caught up with writer, doing non-blocking reads to tail the log + * while (true) { + * record = reader.readNext(true) + * // process the new records + * ... + * } + * </pre> + * + * <h4>Read Batch of Records</h4> + * + * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records + * from a DL log. + * + * <pre> + * LogReader reader = ... + * int N = 10; + * + * // keep reading N records in blocking way until no records available in the log + * List<LogRecord> records = reader.readBulk(false, N); + * while (!records.isEmpty()) { + * // process the list of records + * ... + * if (records.size() < N) { // no more records available in the log + * break; + * } + * // read next N records + * records = reader.readBulk(false, N); + * } + * + * ... + * + * // reader is caught up with writer, doing non-blocking reads to tail the log + * while (true) { + * records = reader.readBulk(true, N) + * // process the new records + * ... + * } + * + * </pre> + * + * @see AsyncLogReader + * + * NOTE: + * 1. Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing + * the {@link AsyncCloseable} interface so the reader could be closed asynchronously + */ +public interface LogReader extends Closeable, AsyncCloseable { + + /** + * Read the next log record from the stream. + * <p> + * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling + * records from read ahead cache. It would return <i>null</i> if there isn't any records + * available in the read ahead cache. + * <p> + * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will + * block until return a record if there are records in the stream (aka catching up). + * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()} + * milliseconds and return null if there isn't any more records in the stream. + * + * @param nonBlocking should the read make blocking calls to the backend or rely on the + * readAhead cache + * @return an operation from the stream or null if at end of stream + * @throws IOException if there is an error reading from the stream + */ + public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException; + + /** + * Read the next <i>numLogRecords</i> log records from the stream + * + * @param nonBlocking should the read make blocking calls to the backend or rely on the + * readAhead cache + * @param numLogRecords maximum number of log records returned by this call. + * @return an operation from the stream or empty list if at end of stream + * @throws IOException if there is an error reading from the stream + * @see #readNext(boolean) + */ + public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException; +}