http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java deleted file mode 100644 index e798a0f..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.ImmutableList; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.ACL; - -import static com.google.common.base.Charsets.UTF_8; - -public class DistributedLogConstants { - public static final byte[] EMPTY_BYTES = new byte[0]; - public static final String SCHEME_PREFIX = "distributedlog"; - public static final String BACKEND_BK = "bk"; - public static final long INVALID_TXID = -999; - public static final long EMPTY_LOGSEGMENT_TX_ID = -99; - public static final long MAX_TXID = Long.MAX_VALUE; - public static final long SMALL_LOGSEGMENT_THRESHOLD = 10; - public static final int LOGSEGMENT_NAME_VERSION = 1; - public static final int FUTURE_TIMEOUT_IMMEDIATE = 0; - public static final int FUTURE_TIMEOUT_INFINITE = -1; - public static final long LOCK_IMMEDIATE = FUTURE_TIMEOUT_IMMEDIATE; - public static final long LOCK_TIMEOUT_INFINITE = FUTURE_TIMEOUT_INFINITE; - public static final long LOCK_OP_TIMEOUT_DEFAULT = 120; - public static final long LOCK_REACQUIRE_TIMEOUT_DEFAULT = 120; - public static final String UNKNOWN_CLIENT_ID = "Unknown-ClientId"; - public static final int LOCAL_REGION_ID = 0; - public static final long LOGSEGMENT_DEFAULT_STATUS = 0; - public static final long UNASSIGNED_LOGSEGMENT_SEQNO = 0; - public static final long UNASSIGNED_SEQUENCE_ID = -1L; - public static final long FIRST_LOGSEGMENT_SEQNO = 1; - public static final long UNRESOLVED_LEDGER_ID = -1; - public static final long LATENCY_WARN_THRESHOLD_IN_MILLIS = TimeUnit.SECONDS.toMillis(1); - public static final int DL_INTERRUPTED_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 1; - public static final int ZK_CONNECTION_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 2; - - public static final String ALLOCATION_POOL_NODE = ".allocation_pool"; - // log segment prefix - public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress"; - public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs"; - public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement"; - static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); - static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8); - - // An ACL that gives all permissions to node creators and read permissions only to everyone else. - public static final List<ACL> EVERYONE_READ_CREATOR_ALL = - ImmutableList.<ACL>builder() - .addAll(Ids.CREATOR_ALL_ACL) - .addAll(Ids.READ_ACL_UNSAFE) - .build(); -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java deleted file mode 100644 index 34cfb65..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java +++ /dev/null @@ -1,308 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.callback.LogSegmentListener; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.distributedlog.namespace.NamespaceDriver; -import com.twitter.distributedlog.subscription.SubscriptionStateStore; -import com.twitter.distributedlog.subscription.SubscriptionsStore; -import com.twitter.util.Future; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/** - * A DistributedLogManager is responsible for managing a single place of storing - * edit logs. It may correspond to multiple files, a backup node, etc. - * Even when the actual underlying storage is rolled, or failed and restored, - * each conceptual place of storage corresponds to exactly one instance of - * this class, which is created when the EditLog is first opened. - */ -public interface DistributedLogManager extends AsyncCloseable, Closeable { - - /** - * Get the name of the stream managed by this log manager - * @return streamName - */ - public String getStreamName(); - - /** - * Get the namespace driver used by this manager. - * - * @return the namespace driver - */ - public NamespaceDriver getNamespaceDriver(); - - /** - * Get log segments. - * - * @return log segments - * @throws IOException - */ - public List<LogSegmentMetadata> getLogSegments() throws IOException; - - /** - * Register <i>listener</i> on log segment updates of this stream. - * - * @param listener - * listener to receive update log segment list. - */ - public void registerListener(LogSegmentListener listener) throws IOException ; - - /** - * Unregister <i>listener</i> on log segment updates from this stream. - * - * @param listener - * listener to receive update log segment list. - */ - public void unregisterListener(LogSegmentListener listener); - - /** - * Open async log writer to write records to the log stream. - * - * @return result represents the open result - */ - public Future<AsyncLogWriter> openAsyncLogWriter(); - - /** - * Begin writing to the log stream identified by the name - * - * @return the writer interface to generate log records - */ - public LogWriter startLogSegmentNonPartitioned() throws IOException; - - /** - * Begin writing to the log stream identified by the name - * - * @return the writer interface to generate log records - */ - // @Deprecated - public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException; - - /** - * Begin appending to the end of the log stream which is being treated as a sequence of bytes - * - * @return the writer interface to generate log records - */ - public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException; - - /** - * Get a reader to read a log stream as a sequence of bytes - * - * @return the writer interface to generate log records - */ - public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException; - - /** - * Get the input stream starting with fromTxnId for the specified log - * - * @param fromTxnId - the first transaction id we want to read - * @return the stream starting with transaction fromTxnId - * @throws IOException if a stream cannot be found. - */ - public LogReader getInputStream(long fromTxnId) - throws IOException; - - public LogReader getInputStream(DLSN fromDLSN) throws IOException; - - /** - * Open an async log reader to read records from a log starting from <code>fromTxnId</code>. - * - * @param fromTxnId - * transaction id to start reading from - * @return async log reader - */ - public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId); - - /** - * Open an async log reader to read records from a log starting from <code>fromDLSN</code> - * - * @param fromDLSN - * dlsn to start reading from - * @return async log reader - */ - public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN); - - // @Deprecated - public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException; - - // @Deprecated - public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException; - - public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN); - - /** - * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>. - * If two readers tried to open using same subscriberId, one would succeed, while the other - * will be blocked until it gets the lock. - * - * @param fromDLSN - * start dlsn - * @param subscriberId - * subscriber id - * @return async log reader - */ - public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId); - - /** - * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from - * its last commit position recorded in subscription store. If no last commit position found - * in subscription store, it would start reading from head of the stream. - * - * If the two readers tried to open using same subscriberId, one would succeed, while the other - * will be blocked until it gets the lock. - * - * @param subscriberId - * subscriber id - * @return async log reader - */ - public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId); - - /** - * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>. - * - * @param transactionId - * transaction id - * @return dlsn of first log record whose transaction id is not less than transactionId. - */ - public Future<DLSN> getDLSNNotLessThanTxId(long transactionId); - - /** - * Get the last log record in the stream - * - * @return the last log record in the stream - * @throws IOException if a stream cannot be found. - */ - public LogRecordWithDLSN getLastLogRecord() - throws IOException; - - /** - * Get the earliest Transaction Id available in the log - * - * @return earliest transaction id - * @throws IOException - */ - public long getFirstTxId() throws IOException; - - /** - * Get Latest Transaction Id in the log - * - * @return latest transaction id - * @throws IOException - */ - public long getLastTxId() throws IOException; - - /** - * Get Latest DLSN in the log - * - * @return last dlsn - * @throws IOException - */ - public DLSN getLastDLSN() throws IOException; - - /** - * Get Latest log record with DLSN in the log - async - * - * @return latest log record with DLSN - */ - public Future<LogRecordWithDLSN> getLastLogRecordAsync(); - - /** - * Get Latest Transaction Id in the log - async - * - * @return latest transaction id - */ - public Future<Long> getLastTxIdAsync(); - - /** - * Get first DLSN in the log. - * - * @return first dlsn in the stream - */ - public Future<DLSN> getFirstDLSNAsync(); - - /** - * Get Latest DLSN in the log - async - * - * @return latest transaction id - */ - public Future<DLSN> getLastDLSNAsync(); - - /** - * Get the number of log records in the active portion of the log - * Any log segments that have already been truncated will not be included - * - * @return number of log records - * @throws IOException - */ - public long getLogRecordCount() throws IOException; - - /** - * Get the number of log records in the active portion of the log - async. - * Any log segments that have already been truncated will not be included - * - * @return future number of log records - * @throws IOException - */ - public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN); - - /** - * Run recovery on the log. - * - * @throws IOException - */ - public void recover() throws IOException; - - /** - * Check if an end of stream marker was added to the stream - * A stream with an end of stream marker cannot be appended to - * - * @return true if the marker was added to the stream, false otherwise - * @throws IOException - */ - public boolean isEndOfStreamMarked() throws IOException; - - /** - * Delete the log. - * - * @throws IOException if the deletion fails - */ - public void delete() throws IOException; - - /** - * The DistributedLogManager may archive/purge any logs for transactionId - * less than or equal to minImageTxId. - * This is to be used only when the client explicitly manages deletion. If - * the cleanup policy is based on sliding time window, then this method need - * not be called. - * - * @param minTxIdToKeep the earliest txid that must be retained - * @throws IOException if purging fails - */ - public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException; - - /** - * Get the subscriptions store provided by the distributedlog manager. - * - * @return subscriptions store manages subscriptions for current stream. - */ - public SubscriptionsStore getSubscriptionsStore(); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java deleted file mode 100644 index bf315fc..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java +++ /dev/null @@ -1,403 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.twitter.distributedlog.exceptions.LogRecordTooLongException; -import com.twitter.distributedlog.exceptions.WriteException; -import com.twitter.distributedlog.io.CompressionCodec; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; - -import javax.annotation.Nullable; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; - -/** - * A set of {@link LogRecord}s. - */ -public class Entry { - - /** - * Create a new log record set. - * - * @param logName - * name of the log - * @param initialBufferSize - * initial buffer size - * @param envelopeBeforeTransmit - * if envelope the buffer before transmit - * @param codec - * compression codec - * @param statsLogger - * stats logger to receive stats - * @return writer to build a log record set. - */ - public static Writer newEntry( - String logName, - int initialBufferSize, - boolean envelopeBeforeTransmit, - CompressionCodec.Type codec, - StatsLogger statsLogger) { - return new EnvelopedEntryWriter( - logName, - initialBufferSize, - envelopeBeforeTransmit, - codec, - statsLogger); - } - - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Build the record set object. - */ - public static class Builder { - - private long logSegmentSequenceNumber = -1; - private long entryId = -1; - private long startSequenceId = Long.MIN_VALUE; - private boolean envelopeEntry = true; - // input stream - private InputStream in = null; - // or bytes array - private byte[] data = null; - private int offset = -1; - private int length = -1; - private Optional<Long> txidToSkipTo = Optional.absent(); - private Optional<DLSN> dlsnToSkipTo = Optional.absent(); - private boolean deserializeRecordSet = true; - - private Builder() {} - - /** - * Reset the builder. - * - * @return builder - */ - public Builder reset() { - logSegmentSequenceNumber = -1; - entryId = -1; - startSequenceId = Long.MIN_VALUE; - envelopeEntry = true; - // input stream - in = null; - // or bytes array - data = null; - offset = -1; - length = -1; - txidToSkipTo = Optional.absent(); - dlsnToSkipTo = Optional.absent(); - return this; - } - - /** - * Set the segment info of the log segment that this record - * set belongs to. - * - * @param lssn - * log segment sequence number - * @param startSequenceId - * start sequence id of this log segment - * @return builder - */ - public Builder setLogSegmentInfo(long lssn, long startSequenceId) { - this.logSegmentSequenceNumber = lssn; - this.startSequenceId = startSequenceId; - return this; - } - - /** - * Set the entry id of this log record set. - * - * @param entryId - * entry id assigned for this log record set. - * @return builder - */ - public Builder setEntryId(long entryId) { - this.entryId = entryId; - return this; - } - - /** - * Set whether this record set is enveloped or not. - * - * @param enabled - * flag indicates whether this record set is enveloped or not. - * @return builder - */ - public Builder setEnvelopeEntry(boolean enabled) { - this.envelopeEntry = enabled; - return this; - } - - /** - * Set the serialized bytes data of this record set. - * - * @param data - * serialized bytes data of this record set. - * @param offset - * offset of the bytes data - * @param length - * length of the bytes data - * @return builder - */ - public Builder setData(byte[] data, int offset, int length) { - this.data = data; - this.offset = offset; - this.length = length; - return this; - } - - /** - * Set the input stream of the serialized bytes data of this record set. - * - * @param in - * input stream - * @return builder - */ - public Builder setInputStream(InputStream in) { - this.in = in; - return this; - } - - /** - * Set the record set starts from <code>dlsn</code>. - * - * @param dlsn - * dlsn to skip to - * @return builder - */ - public Builder skipTo(@Nullable DLSN dlsn) { - this.dlsnToSkipTo = Optional.fromNullable(dlsn); - return this; - } - - /** - * Set the record set starts from <code>txid</code>. - * - * @param txid - * txid to skip to - * @return builder - */ - public Builder skipTo(long txid) { - this.txidToSkipTo = Optional.of(txid); - return this; - } - - /** - * Enable/disable deserialize record set. - * - * @param enabled - * flag to enable/disable dserialize record set. - * @return builder - */ - public Builder deserializeRecordSet(boolean enabled) { - this.deserializeRecordSet = enabled; - return this; - } - - public Entry build() { - Preconditions.checkNotNull(data, "Serialized data isn't provided"); - Preconditions.checkArgument(offset >= 0 && length >= 0 - && (offset + length) <= data.length, - "Invalid offset or length of serialized data"); - return new Entry( - logSegmentSequenceNumber, - entryId, - startSequenceId, - envelopeEntry, - deserializeRecordSet, - data, - offset, - length, - txidToSkipTo, - dlsnToSkipTo); - } - - public Entry.Reader buildReader() throws IOException { - Preconditions.checkArgument(data != null || in != null, - "Serialized data or input stream isn't provided"); - InputStream in; - if (null != this.in) { - in = this.in; - } else { - Preconditions.checkArgument(offset >= 0 && length >= 0 - && (offset + length) <= data.length, - "Invalid offset or length of serialized data"); - in = new ByteArrayInputStream(data, offset, length); - } - return new EnvelopedEntryReader( - logSegmentSequenceNumber, - entryId, - startSequenceId, - in, - envelopeEntry, - deserializeRecordSet, - NullStatsLogger.INSTANCE); - } - - } - - private final long logSegmentSequenceNumber; - private final long entryId; - private final long startSequenceId; - private final boolean envelopedEntry; - private final boolean deserializeRecordSet; - private final byte[] data; - private final int offset; - private final int length; - private final Optional<Long> txidToSkipTo; - private final Optional<DLSN> dlsnToSkipTo; - - private Entry(long logSegmentSequenceNumber, - long entryId, - long startSequenceId, - boolean envelopedEntry, - boolean deserializeRecordSet, - byte[] data, - int offset, - int length, - Optional<Long> txidToSkipTo, - Optional<DLSN> dlsnToSkipTo) { - this.logSegmentSequenceNumber = logSegmentSequenceNumber; - this.entryId = entryId; - this.startSequenceId = startSequenceId; - this.envelopedEntry = envelopedEntry; - this.deserializeRecordSet = deserializeRecordSet; - this.data = data; - this.offset = offset; - this.length = length; - this.txidToSkipTo = txidToSkipTo; - this.dlsnToSkipTo = dlsnToSkipTo; - } - - /** - * Get raw data of this record set. - * - * @return raw data representation of this record set. - */ - public byte[] getRawData() { - return data; - } - - /** - * Create reader to iterate over this record set. - * - * @return reader to iterate over this record set. - * @throws IOException if the record set is invalid record set. - */ - public Reader reader() throws IOException { - InputStream in = new ByteArrayInputStream(data, offset, length); - Reader reader = new EnvelopedEntryReader( - logSegmentSequenceNumber, - entryId, - startSequenceId, - in, - envelopedEntry, - deserializeRecordSet, - NullStatsLogger.INSTANCE); - if (txidToSkipTo.isPresent()) { - reader.skipTo(txidToSkipTo.get()); - } - if (dlsnToSkipTo.isPresent()) { - reader.skipTo(dlsnToSkipTo.get()); - } - return reader; - } - - /** - * Writer to append {@link LogRecord}s to {@link Entry}. - */ - public interface Writer extends EntryBuffer { - - /** - * Write a {@link LogRecord} to this record set. - * - * @param record - * record to write - * @param transmitPromise - * callback for transmit result. the promise is only - * satisfied when this record set is transmitted. - * @throws LogRecordTooLongException if the record is too long - * @throws WriteException when encountered exception writing the record - */ - void writeRecord(LogRecord record, Promise<DLSN> transmitPromise) - throws LogRecordTooLongException, WriteException; - - /** - * Reset the writer to write records. - */ - void reset(); - - } - - /** - * Reader to read {@link LogRecord}s from this record set. - */ - public interface Reader { - - /** - * Get the log segment sequence number. - * - * @return the log segment sequence number. - */ - long getLSSN(); - - /** - * Return the entry id. - * - * @return the entry id. - */ - long getEntryId(); - - /** - * Read next log record from this record set. - * - * @return next log record from this record set. - */ - LogRecordWithDLSN nextRecord() throws IOException; - - /** - * Skip the reader to the record whose transaction id is <code>txId</code>. - * - * @param txId - * transaction id to skip to. - * @return true if skip succeeds, otherwise false. - * @throws IOException - */ - boolean skipTo(long txId) throws IOException; - - /** - * Skip the reader to the record whose DLSN is <code>dlsn</code>. - * - * @param dlsn - * DLSN to skip to. - * @return true if skip succeeds, otherwise false. - * @throws IOException - */ - boolean skipTo(DLSN dlsn) throws IOException; - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java deleted file mode 100644 index 394fbad..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException; -import com.twitter.distributedlog.io.Buffer; -import com.twitter.distributedlog.io.TransmitListener; - -import java.io.IOException; - -/** - * Write representation of a {@link Entry}. - * It is a buffer of log record set, used for transmission. - */ -public interface EntryBuffer extends TransmitListener { - - /** - * Return if this record set contains user records. - * - * @return true if this record set contains user records, otherwise - * return false. - */ - boolean hasUserRecords(); - - /** - * Return number of records in current record set. - * - * @return number of records in current record set. - */ - int getNumRecords(); - - /** - * Return number of bytes in current record set. - * - * @return number of bytes in current record set. - */ - int getNumBytes(); - - /** - * Return max tx id in current record set. - * - * @return max tx id. - */ - long getMaxTxId(); - - /** - * Get the buffer to transmit. - * - * @return the buffer to transmit. - * @throws InvalidEnvelopedEntryException if the record set buffer is invalid - * @throws IOException when encountered IOException during serialization - */ - Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java deleted file mode 100644 index 0a15d29..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -/** - * The position of an entry, identified by log segment sequence number and entry id. - */ -class EntryPosition { - - private long lssn; - private long entryId; - - EntryPosition(long lssn, long entryId) { - this.lssn = lssn; - this.entryId = entryId; - } - - public synchronized long getLogSegmentSequenceNumber() { - return lssn; - } - - public synchronized long getEntryId() { - return entryId; - } - - public synchronized boolean advance(long lssn, long entryId) { - if (lssn == this.lssn) { - if (entryId <= this.entryId) { - return false; - } - this.entryId = entryId; - return true; - } else if (lssn > this.lssn) { - this.lssn = lssn; - this.entryId = entryId; - return true; - } else { - return false; - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("(").append(lssn).append(", ").append(entryId).append(")"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java deleted file mode 100644 index 55d3be9..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java +++ /dev/null @@ -1,296 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; - -import com.google.common.base.Preconditions; - -import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; - -import com.twitter.distributedlog.annotations.DistributedLogAnnotations.Compression; -import com.twitter.distributedlog.io.CompressionCodec; -import com.twitter.distributedlog.io.CompressionUtils; -import com.twitter.distributedlog.util.BitMaskUtils; - -/** - * An enveloped entry written to BookKeeper. - * - * Data type in brackets. Interpretation should be on the basis of data types and not individual - * bytes to honor Endianness. - * - * Entry Structure: - * --------------- - * Bytes 0 : Version (Byte) - * Bytes 1 - (DATA = 1+Header.length-1) : Header (Integer) - * Bytes DATA - DATA+3 : Payload Length (Integer) - * BYTES DATA+4 - DATA+4+payload.length-1 : Payload (Byte[]) - * - * V1 Header Structure: // Offsets relative to the start of the header. - * ------------------- - * Bytes 0 - 3 : Flags (Integer) - * Bytes 4 - 7 : Original payload size before compression (Integer) - * - * Flags: // 32 Bits - * ----- - * 0 ... 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 - * |_| - * | - * Compression Type - * - * Compression Type: // 2 Bits (Least significant) - * ---------------- - * 00 : No Compression - * 01 : LZ4 Compression - * 10 : Unused - * 11 : Unused - */ -public class EnvelopedEntry { - - public static final int VERSION_LENGTH = 1; // One byte long - public static final byte VERSION_ONE = 1; - - public static final byte LOWEST_SUPPORTED_VERSION = VERSION_ONE; - public static final byte HIGHEST_SUPPORTED_VERSION = VERSION_ONE; - public static final byte CURRENT_VERSION = VERSION_ONE; - - private final OpStatsLogger compressionStat; - private final OpStatsLogger decompressionStat; - private final Counter compressedEntryBytes; - private final Counter decompressedEntryBytes; - private final byte version; - - private Header header = new Header(); - private Payload payloadCompressed = new Payload(); - private Payload payloadDecompressed = new Payload(); - - public EnvelopedEntry(byte version, - StatsLogger statsLogger) throws InvalidEnvelopedEntryException { - Preconditions.checkNotNull(statsLogger); - if (version < LOWEST_SUPPORTED_VERSION || version > HIGHEST_SUPPORTED_VERSION) { - throw new InvalidEnvelopedEntryException("Invalid enveloped entry version " + version + ", expected to be in [ " - + LOWEST_SUPPORTED_VERSION + " ~ " + HIGHEST_SUPPORTED_VERSION + " ]"); - } - this.version = version; - this.compressionStat = statsLogger.getOpStatsLogger("compression_time"); - this.decompressionStat = statsLogger.getOpStatsLogger("decompression_time"); - this.compressedEntryBytes = statsLogger.getCounter("compressed_bytes"); - this.decompressedEntryBytes = statsLogger.getCounter("decompressed_bytes"); - } - - /** - * @param statsLogger - * Used for getting stats for (de)compression time - * @param compressionType - * The compression type to use - * @param decompressed - * The decompressed payload - * NOTE: The size of the byte array passed as the decompressed payload can be larger - * than the actual contents to be compressed. - */ - public EnvelopedEntry(byte version, - CompressionCodec.Type compressionType, - byte[] decompressed, - int length, - StatsLogger statsLogger) - throws InvalidEnvelopedEntryException { - this(version, statsLogger); - Preconditions.checkNotNull(compressionType); - Preconditions.checkNotNull(decompressed); - Preconditions.checkArgument(length >= 0, "Invalid bytes length " + length); - - this.header = new Header(compressionType, length); - this.payloadDecompressed = new Payload(length, decompressed); - } - - private boolean isReady() { - return (header.ready && payloadDecompressed.ready); - } - - @Compression - public void writeFully(DataOutputStream out) throws IOException { - Preconditions.checkNotNull(out); - if (!isReady()) { - throw new IOException("Entry not writable"); - } - // Version - out.writeByte(version); - // Header - header.write(out); - // Compress - CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType); - byte[] compressed = codec.compress( - payloadDecompressed.payload, - 0, - payloadDecompressed.length, - compressionStat); - this.payloadCompressed = new Payload(compressed.length, compressed); - this.compressedEntryBytes.add(payloadCompressed.length); - this.decompressedEntryBytes.add(payloadDecompressed.length); - payloadCompressed.write(out); - } - - @Compression - public void readFully(DataInputStream in) throws IOException { - Preconditions.checkNotNull(in); - // Make sure we're reading the right versioned entry. - byte version = in.readByte(); - if (version != this.version) { - throw new IOException(String.format("Version mismatch while reading. Received: %d," + - " Required: %d", version, this.version)); - } - header.read(in); - payloadCompressed.read(in); - // Decompress - CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType); - byte[] decompressed = codec.decompress( - payloadCompressed.payload, - 0, - payloadCompressed.length, - header.decompressedSize, - decompressionStat); - this.payloadDecompressed = new Payload(decompressed.length, decompressed); - this.compressedEntryBytes.add(payloadCompressed.length); - this.decompressedEntryBytes.add(payloadDecompressed.length); - } - - public byte[] getDecompressedPayload() throws IOException { - if (!isReady()) { - throw new IOException("Decompressed payload is not initialized"); - } - return payloadDecompressed.payload; - } - - public static class Header { - public static final int COMPRESSION_CODEC_MASK = 0x3; - public static final int COMPRESSION_CODEC_NONE = 0x0; - public static final int COMPRESSION_CODEC_LZ4 = 0x1; - - private int flags = 0; - private int decompressedSize = 0; - private CompressionCodec.Type compressionType = CompressionCodec.Type.UNKNOWN; - - // Whether this struct is ready for reading/writing. - private boolean ready = false; - - // Used while reading. - public Header() { - } - - public Header(CompressionCodec.Type compressionType, - int decompressedSize) { - this.compressionType = compressionType; - this.decompressedSize = decompressedSize; - this.flags = 0; - switch (compressionType) { - case NONE: - this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK, - COMPRESSION_CODEC_NONE); - break; - case LZ4: - this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK, - COMPRESSION_CODEC_LZ4); - break; - default: - throw new RuntimeException(String.format("Unknown Compression Type: %s", - compressionType)); - } - // This can now be written. - this.ready = true; - } - - private void write(DataOutputStream out) throws IOException { - out.writeInt(flags); - out.writeInt(decompressedSize); - } - - private void read(DataInputStream in) throws IOException { - this.flags = in.readInt(); - int compressionType = (int) BitMaskUtils.get(flags, COMPRESSION_CODEC_MASK); - if (compressionType == COMPRESSION_CODEC_NONE) { - this.compressionType = CompressionCodec.Type.NONE; - } else if (compressionType == COMPRESSION_CODEC_LZ4) { - this.compressionType = CompressionCodec.Type.LZ4; - } else { - throw new IOException(String.format("Unsupported Compression Type: %s", - compressionType)); - } - this.decompressedSize = in.readInt(); - // Values can now be read. - this.ready = true; - } - } - - public static class Payload { - private int length = 0; - private byte[] payload = null; - - // Whether this struct is ready for reading/writing. - private boolean ready = false; - - // Used for reading - Payload() { - } - - Payload(int length, byte[] payload) { - this.length = length; - this.payload = payload; - this.ready = true; - } - - private void write(DataOutputStream out) throws IOException { - out.writeInt(length); - out.write(payload, 0, length); - } - - private void read(DataInputStream in) throws IOException { - this.length = in.readInt(); - this.payload = new byte[length]; - in.readFully(payload); - this.ready = true; - } - } - - /** - * Return an InputStream that reads from the provided InputStream, decompresses the data - * and returns a new InputStream wrapping the underlying payload. - * - * Note that src is modified by this call. - * - * @return - * New Input stream with the underlying payload. - * @throws Exception - */ - public static InputStream fromInputStream(InputStream src, - StatsLogger statsLogger) throws IOException { - src.mark(VERSION_LENGTH); - byte version = new DataInputStream(src).readByte(); - src.reset(); - EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger); - entry.readFully(new DataInputStream(src)); - return new ByteArrayInputStream(entry.getDecompressedPayload()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java deleted file mode 100644 index 038bb18..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import org.apache.bookkeeper.stats.StatsLogger; - -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; - -/** - * Record reader to read records from an enveloped entry buffer. - */ -class EnvelopedEntryReader implements Entry.Reader, RecordStream { - - private final long logSegmentSeqNo; - private final long entryId; - private final LogRecord.Reader reader; - - // slot id - private long slotId = 0; - - EnvelopedEntryReader(long logSegmentSeqNo, - long entryId, - long startSequenceId, - InputStream in, - boolean envelopedEntry, - boolean deserializeRecordSet, - StatsLogger statsLogger) - throws IOException { - this.logSegmentSeqNo = logSegmentSeqNo; - this.entryId = entryId; - InputStream src = in; - if (envelopedEntry) { - src = EnvelopedEntry.fromInputStream(in, statsLogger); - } - this.reader = new LogRecord.Reader( - this, - new DataInputStream(src), - startSequenceId, - deserializeRecordSet); - } - - @Override - public long getLSSN() { - return logSegmentSeqNo; - } - - @Override - public long getEntryId() { - return entryId; - } - - @Override - public LogRecordWithDLSN nextRecord() throws IOException { - return reader.readOp(); - } - - @Override - public boolean skipTo(long txId) throws IOException { - return reader.skipTo(txId, true); - } - - @Override - public boolean skipTo(DLSN dlsn) throws IOException { - return reader.skipTo(dlsn); - } - - // - // Record Stream - // - - @Override - public void advance(int numRecords) { - slotId += numRecords; - } - - @Override - public DLSN getCurrentPosition() { - return new DLSN(logSegmentSeqNo, entryId, slotId); - } - - @Override - public String getName() { - return "EnvelopedReader"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java deleted file mode 100644 index 01a91ab..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.Entry.Writer; -import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException; -import com.twitter.distributedlog.exceptions.LogRecordTooLongException; -import com.twitter.distributedlog.exceptions.WriteCancelledException; -import com.twitter.distributedlog.exceptions.WriteException; -import com.twitter.distributedlog.io.Buffer; -import com.twitter.distributedlog.io.CompressionCodec; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - -import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; - -/** - * {@link com.twitter.distributedlog.io.Buffer} based log record set writer. - */ -class EnvelopedEntryWriter implements Writer { - - static final Logger logger = LoggerFactory.getLogger(EnvelopedEntryWriter.class); - - private static class WriteRequest { - - private final int numRecords; - private final Promise<DLSN> promise; - - WriteRequest(int numRecords, Promise<DLSN> promise) { - this.numRecords = numRecords; - this.promise = promise; - } - - } - - private final String logName; - private final Buffer buffer; - private final LogRecord.Writer writer; - private final List<WriteRequest> writeRequests; - private final boolean envelopeBeforeTransmit; - private final CompressionCodec.Type codec; - private final StatsLogger statsLogger; - private int count = 0; - private boolean hasUserData = false; - private long maxTxId = Long.MIN_VALUE; - - EnvelopedEntryWriter(String logName, - int initialBufferSize, - boolean envelopeBeforeTransmit, - CompressionCodec.Type codec, - StatsLogger statsLogger) { - this.logName = logName; - this.buffer = new Buffer(initialBufferSize * 6 / 5); - this.writer = new LogRecord.Writer(new DataOutputStream(buffer)); - this.writeRequests = new LinkedList<WriteRequest>(); - this.envelopeBeforeTransmit = envelopeBeforeTransmit; - this.codec = codec; - this.statsLogger = statsLogger; - } - - @Override - public synchronized void reset() { - cancelPromises(new WriteCancelledException(logName, "Record Set is reset")); - count = 0; - this.buffer.reset(); - } - - @Override - public synchronized void writeRecord(LogRecord record, - Promise<DLSN> transmitPromise) - throws LogRecordTooLongException, WriteException { - int logRecordSize = record.getPersistentSize(); - if (logRecordSize > MAX_LOGRECORD_SIZE) { - throw new LogRecordTooLongException( - "Log Record of size " + logRecordSize + " written when only " - + MAX_LOGRECORD_SIZE + " is allowed"); - } - - try { - this.writer.writeOp(record); - int numRecords = 1; - if (!record.isControl()) { - hasUserData = true; - } - if (record.isRecordSet()) { - numRecords = LogRecordSet.numRecords(record); - } - count += numRecords; - writeRequests.add(new WriteRequest(numRecords, transmitPromise)); - maxTxId = Math.max(maxTxId, record.getTransactionId()); - } catch (IOException e) { - logger.error("Failed to append record to record set of {} : ", - logName, e); - throw new WriteException(logName, "Failed to append record to record set of " - + logName); - } - } - - private synchronized void satisfyPromises(long lssn, long entryId) { - long nextSlotId = 0; - for (WriteRequest request : writeRequests) { - request.promise.setValue(new DLSN(lssn, entryId, nextSlotId)); - nextSlotId += request.numRecords; - } - writeRequests.clear(); - } - - private synchronized void cancelPromises(Throwable reason) { - for (WriteRequest request : writeRequests) { - request.promise.setException(reason); - } - writeRequests.clear(); - } - - @Override - public synchronized long getMaxTxId() { - return maxTxId; - } - - @Override - public synchronized boolean hasUserRecords() { - return hasUserData; - } - - @Override - public int getNumBytes() { - return buffer.size(); - } - - @Override - public synchronized int getNumRecords() { - return count; - } - - @Override - public synchronized Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException { - if (!envelopeBeforeTransmit) { - return buffer; - } - // We can't escape this allocation because things need to be read from one byte array - // and then written to another. This is the destination. - Buffer toSend = new Buffer(buffer.size()); - byte[] decompressed = buffer.getData(); - int length = buffer.size(); - EnvelopedEntry entry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION, - codec, - decompressed, - length, - statsLogger); - // This will cause an allocation of a byte[] for compression. This can be avoided - // but we can do that later only if needed. - entry.writeFully(new DataOutputStream(toSend)); - return toSend; - } - - @Override - public synchronized DLSN finalizeTransmit(long lssn, long entryId) { - return new DLSN(lssn, entryId, count - 1); - } - - @Override - public void completeTransmit(long lssn, long entryId) { - satisfyPromises(lssn, entryId); - } - - @Override - public void abortTransmit(Throwable reason) { - cancelPromises(reason); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java deleted file mode 100644 index 550d314..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import java.io.Serializable; -import java.util.Comparator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LedgerReadPosition { - static final Logger LOG = LoggerFactory.getLogger(LedgerReadPosition.class); - - private static enum PartialOrderingComparisonResult { - NotComparable, - GreaterThan, - LessThan, - EqualTo - } - - long ledgerId = DistributedLogConstants.UNRESOLVED_LEDGER_ID; - long logSegmentSequenceNo; - long entryId; - - public LedgerReadPosition(long ledgerId, long logSegmentSequenceNo, long entryId) { - this.ledgerId = ledgerId; - this.logSegmentSequenceNo = logSegmentSequenceNo; - this.entryId = entryId; - } - - public LedgerReadPosition(LedgerReadPosition that) { - this.ledgerId = that.ledgerId; - this.logSegmentSequenceNo = that.logSegmentSequenceNo; - this.entryId = that.entryId; - } - - - public LedgerReadPosition(final DLSN dlsn) { - this(dlsn.getLogSegmentSequenceNo(), dlsn.getEntryId()); - } - - public LedgerReadPosition(long logSegmentSequenceNo, long entryId) { - this.logSegmentSequenceNo = logSegmentSequenceNo; - this.entryId = entryId; - } - - public long getLedgerId() { - if (DistributedLogConstants.UNRESOLVED_LEDGER_ID == ledgerId) { - LOG.trace("Ledger Id is not initialized"); - throw new IllegalStateException("Ledger Id is not initialized"); - } - return ledgerId; - } - - public long getLogSegmentSequenceNumber() { - return logSegmentSequenceNo; - } - - public long getEntryId() { - return entryId; - } - - public void advance() { - entryId++; - } - - public void positionOnNewLogSegment(long ledgerId, long logSegmentSequenceNo) { - this.ledgerId = ledgerId; - this.logSegmentSequenceNo = logSegmentSequenceNo; - this.entryId = 0L; - } - - @Override - public String toString() { - return String.format("(lid=%d, lseqNo=%d, eid=%d)", ledgerId, logSegmentSequenceNo, entryId); - } - - public boolean definitelyLessThanOrEqualTo(LedgerReadPosition threshold) { - PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold); - return ((result == PartialOrderingComparisonResult.LessThan) || - (result == PartialOrderingComparisonResult.EqualTo)); - } - - public boolean definitelyLessThan(LedgerReadPosition threshold) { - PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold); - return result == PartialOrderingComparisonResult.LessThan; - } - - private PartialOrderingComparisonResult comparePartiallyOrdered(LedgerReadPosition threshold) { - // If no threshold is passed we cannot make a definitive comparison - if (null == threshold) { - return PartialOrderingComparisonResult.NotComparable; - } - - if (this.logSegmentSequenceNo != threshold.logSegmentSequenceNo) { - if (this.logSegmentSequenceNo < threshold.logSegmentSequenceNo) { - return PartialOrderingComparisonResult.LessThan; - } else { - return PartialOrderingComparisonResult.GreaterThan; - } - } else if (this.ledgerId != threshold.ledgerId) { - // When logSegmentSequenceNo is equal we cannot definitely say that this - // position is less than the threshold unless ledgerIds are equal - // since LogSegmentSequenceNumber maybe inferred from transactionIds in older - // versions of the metadata. - return PartialOrderingComparisonResult.NotComparable; - } else if (this.getEntryId() < threshold.getEntryId()) { - return PartialOrderingComparisonResult.LessThan; - } else if (this.getEntryId() > threshold.getEntryId()) { - return PartialOrderingComparisonResult.GreaterThan; - } else { - return PartialOrderingComparisonResult.EqualTo; - } - } - - /** - * Comparator for the key portion - */ - public static final ReadAheadCacheKeyComparator COMPARATOR = new ReadAheadCacheKeyComparator(); - - // Only compares the key portion - @Override - public boolean equals(Object other) { - if (!(other instanceof LedgerReadPosition)) { - return false; - } - LedgerReadPosition key = (LedgerReadPosition) other; - return ledgerId == key.ledgerId && - entryId == key.entryId; - } - - @Override - public int hashCode() { - return (int) (ledgerId * 13 ^ entryId * 17); - } - - /** - * Compare EntryKey. - */ - protected static class ReadAheadCacheKeyComparator implements Comparator<LedgerReadPosition>, Serializable { - - private static final long serialVersionUID = 0L; - - @Override - public int compare(LedgerReadPosition left, LedgerReadPosition right) { - long ret = left.ledgerId - right.ledgerId; - if (ret == 0) { - ret = left.entryId - right.entryId; - } - return (ret < 0) ? -1 : ((ret > 0) ? 1 : 0); - } - } - -} - - http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java deleted file mode 100644 index f4a1e41..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java +++ /dev/null @@ -1,364 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Optional; -import com.twitter.distributedlog.impl.metadata.BKDLConfig; -import com.twitter.distributedlog.metadata.DLMetadata; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.proto.BookieServer; -import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim; -import org.apache.bookkeeper.util.IOUtils; -import org.apache.bookkeeper.util.LocalBookKeeper; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.BindException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * Utility class for setting up bookkeeper ensembles - * and bringing individual bookies up and down - */ -public class LocalDLMEmulator { - private static final Logger LOG = LoggerFactory.getLogger(LocalDLMEmulator.class); - - public static final String DLOG_NAMESPACE = "/messaging/distributedlog"; - - private static final int DEFAULT_BOOKIE_INITIAL_PORT = 0; // Use ephemeral ports - private static final int DEFAULT_ZK_TIMEOUT_SEC = 10; - private static final int DEFAULT_ZK_PORT = 2181; - private static final String DEFAULT_ZK_HOST = "127.0.0.1"; - private static final String DEFAULT_ZK_ENSEMBLE = DEFAULT_ZK_HOST + ":" + DEFAULT_ZK_PORT; - private static final int DEFAULT_NUM_BOOKIES = 3; - private static final ServerConfiguration DEFAULT_SERVER_CONFIGURATION = new ServerConfiguration(); - - private final String zkEnsemble; - private final URI uri; - private final List<File> tmpDirs = new ArrayList<File>(); - private final int zkTimeoutSec; - private final Thread bkStartupThread; - private final String zkHost; - private final int zkPort; - private final int numBookies; - - public static class Builder { - private int zkTimeoutSec = DEFAULT_ZK_TIMEOUT_SEC; - private int numBookies = DEFAULT_NUM_BOOKIES; - private String zkHost = DEFAULT_ZK_HOST; - private int zkPort = DEFAULT_ZK_PORT; - private int initialBookiePort = DEFAULT_BOOKIE_INITIAL_PORT; - private boolean shouldStartZK = true; - private Optional<ServerConfiguration> serverConf = Optional.absent(); - - public Builder numBookies(int numBookies) { - this.numBookies = numBookies; - return this; - } - public Builder zkHost(String zkHost) { - this.zkHost = zkHost; - return this; - } - public Builder zkPort(int zkPort) { - this.zkPort = zkPort; - return this; - } - public Builder zkTimeoutSec(int zkTimeoutSec) { - this.zkTimeoutSec = zkTimeoutSec; - return this; - } - public Builder initialBookiePort(int initialBookiePort) { - this.initialBookiePort = initialBookiePort; - return this; - } - public Builder shouldStartZK(boolean shouldStartZK) { - this.shouldStartZK = shouldStartZK; - return this; - } - public Builder serverConf(ServerConfiguration serverConf) { - this.serverConf = Optional.of(serverConf); - return this; - } - - public LocalDLMEmulator build() throws Exception { - ServerConfiguration conf = null; - if (serverConf.isPresent()) { - conf = serverConf.get(); - } else { - conf = (ServerConfiguration) DEFAULT_SERVER_CONFIGURATION.clone(); - conf.setZkTimeout(zkTimeoutSec * 1000); - } - ServerConfiguration newConf = new ServerConfiguration(); - newConf.loadConf(conf); - newConf.setAllowLoopback(true); - - return new LocalDLMEmulator(numBookies, shouldStartZK, zkHost, zkPort, - initialBookiePort, zkTimeoutSec, newConf); - } - } - - public static Builder newBuilder() { - return new Builder(); - } - - private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort, final int zkTimeoutSec, final ServerConfiguration serverConf) throws Exception { - this.numBookies = numBookies; - this.zkHost = zkHost; - this.zkPort = zkPort; - this.zkEnsemble = zkHost + ":" + zkPort; - this.uri = URI.create("distributedlog://" + zkEnsemble + DLOG_NAMESPACE); - this.zkTimeoutSec = zkTimeoutSec; - this.bkStartupThread = new Thread() { - public void run() { - try { - LOG.info("Starting {} bookies : allowLoopback = {}", numBookies, serverConf.getAllowLoopback()); - LocalBookKeeper.startLocalBookies(zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, serverConf); - LOG.info("{} bookies are started."); - } catch (InterruptedException e) { - // go away quietly - } catch (Exception e) { - LOG.error("Error starting local bk", e); - } - } - }; - } - - public void start() throws Exception { - bkStartupThread.start(); - if (!LocalBookKeeper.waitForServerUp(zkEnsemble, zkTimeoutSec*1000)) { - throw new Exception("Error starting zookeeper/bookkeeper"); - } - int bookiesUp = checkBookiesUp(numBookies, zkTimeoutSec); - assert (numBookies == bookiesUp); - // Provision "/messaging/distributedlog" namespace - DLMetadata.create(new BKDLConfig(zkEnsemble, "/ledgers")).create(uri); - } - - public void teardown() throws Exception { - if (bkStartupThread != null) { - bkStartupThread.interrupt(); - bkStartupThread.join(); - } - for (File dir : tmpDirs) { - FileUtils.deleteDirectory(dir); - } - } - - public String getZkServers() { - return zkEnsemble; - } - - public URI getUri() { - return uri; - } - - public BookieServer newBookie() throws Exception { - ServerConfiguration bookieConf = new ServerConfiguration(); - bookieConf.setZkTimeout(zkTimeoutSec * 1000); - bookieConf.setBookiePort(0); - bookieConf.setAllowLoopback(true); - File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_", - "test"); - if (!tmpdir.delete()) { - LOG.debug("Fail to delete tmpdir " + tmpdir); - } - if (!tmpdir.mkdir()) { - throw new IOException("Fail to create tmpdir " + tmpdir); - } - tmpDirs.add(tmpdir); - - bookieConf.setZkServers(zkEnsemble); - bookieConf.setJournalDirName(tmpdir.getPath()); - bookieConf.setLedgerDirNames(new String[]{tmpdir.getPath()}); - - BookieServer b = new BookieServer(bookieConf); - b.start(); - for (int i = 0; i < 10 && !b.isRunning(); i++) { - Thread.sleep(10000); - } - if (!b.isRunning()) { - throw new IOException("Bookie would not start"); - } - return b; - } - - /** - * Check that a number of bookies are available - * - * @param count number of bookies required - * @param timeout number of seconds to wait for bookies to start - * @throws java.io.IOException if bookies are not started by the time the timeout hits - */ - public int checkBookiesUp(int count, int timeout) throws Exception { - ZooKeeper zkc = connectZooKeeper(zkHost, zkPort, zkTimeoutSec); - try { - int mostRecentSize = 0; - for (int i = 0; i < timeout; i++) { - try { - List<String> children = zkc.getChildren("/ledgers/available", - false); - children.remove("readonly"); - mostRecentSize = children.size(); - if ((mostRecentSize > count) || LOG.isDebugEnabled()) { - LOG.info("Found " + mostRecentSize + " bookies up, " - + "waiting for " + count); - if ((mostRecentSize > count) || LOG.isTraceEnabled()) { - for (String child : children) { - LOG.info(" server: " + child); - } - } - } - if (mostRecentSize == count) { - break; - } - } catch (KeeperException e) { - // ignore - } - Thread.sleep(1000); - } - return mostRecentSize; - } finally { - zkc.close(); - } - } - - public static String getBkLedgerPath() { - return "/ledgers"; - } - - public static ZooKeeper connectZooKeeper(String zkHost, int zkPort) - throws IOException, KeeperException, InterruptedException { - return connectZooKeeper(zkHost, zkPort, DEFAULT_ZK_TIMEOUT_SEC); - } - - public static ZooKeeper connectZooKeeper(String zkHost, int zkPort, int zkTimeoutSec) - throws IOException, KeeperException, InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - final String zkHostPort = zkHost + ":" + zkPort; - - ZooKeeper zkc = new ZooKeeper(zkHostPort, zkTimeoutSec * 1000, new Watcher() { - public void process(WatchedEvent event) { - if (event.getState() == Event.KeeperState.SyncConnected) { - latch.countDown(); - } - } - }); - if (!latch.await(zkTimeoutSec, TimeUnit.SECONDS)) { - throw new IOException("Zookeeper took too long to connect"); - } - return zkc; - } - - public static URI createDLMURI(String path) throws Exception { - return createDLMURI(DEFAULT_ZK_ENSEMBLE, path); - } - - public static URI createDLMURI(String zkServers, String path) throws Exception { - return URI.create("distributedlog://" + zkServers + DLOG_NAMESPACE + path); - } - - /** - * Try to start zookkeeper locally on any port. - */ - public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(File zkDir) throws Exception { - return runZookeeperOnAnyPort((int) (Math.random()*10000+7000), zkDir); - } - - /** - * Try to start zookkeeper locally on any port beginning with some base port. - * Dump some socket info when bind fails. - */ - public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(int basePort, File zkDir) throws Exception { - - final int MAX_RETRIES = 20; - final int MIN_PORT = 1025; - final int MAX_PORT = 65535; - ZooKeeperServerShim zks = null; - int zkPort = basePort; - boolean success = false; - int retries = 0; - - while (!success) { - try { - LOG.info("zk trying to bind to port " + zkPort); - zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkDir); - success = true; - } catch (BindException be) { - retries++; - if (retries > MAX_RETRIES) { - throw be; - } - zkPort++; - if (zkPort > MAX_PORT) { - zkPort = MIN_PORT; - } - } - } - - return Pair.of(zks, zkPort); - } - - public static void main(String[] args) throws Exception { - try { - if (args.length < 1) { - System.out.println("Usage: LocalDLEmulator <zk_port>"); - System.exit(-1); - } - - final int zkPort = Integer.parseInt(args[0]); - final File zkDir = IOUtils.createTempDir("distrlog", "zookeeper"); - final LocalDLMEmulator localDlm = LocalDLMEmulator.newBuilder() - .zkPort(zkPort) - .build(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - localDlm.teardown(); - FileUtils.deleteDirectory(zkDir); - System.out.println("ByeBye!"); - } catch (Exception e) { - // do nothing - } - } - }); - localDlm.start(); - - System.out.println(String.format( - "DistributedLog Sandbox is running now. You could access distributedlog://%s:%s", - DEFAULT_ZK_HOST, - zkPort)); - } catch (Exception ex) { - System.out.println("Exception occurred running emulator " + ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java deleted file mode 100644 index c12de29..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.io.AsyncCloseable; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/** - * <i>LogReader</i> is a `synchronous` reader reading records from a DL log. - * - * <h3>Lifecycle of a Reader</h3> - * - * A reader is a <i>sequential</i> reader that read records from a DL log starting - * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)} - * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}. - * <p> - * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)} - * to read records out the log from provided position. - * <p> - * Closing the reader (via {@link #close()} will release all the resources occupied - * by this reader instance. - * <p> - * Exceptions could be thrown during reading records. Once the exception is thrown, - * the reader is set to an error state and it isn't usable anymore. It is the application's - * responsibility to handle the exceptions and re-create readers if necessary. - * <p> - * Example: - * <pre> - * DistributedLogManager dlm = ...; - * long nextTxId = ...; - * LogReader reader = dlm.getInputStream(nextTxId); - * - * while (true) { // keep reading & processing records - * LogRecord record; - * try { - * record = reader.readNext(false); - * nextTxId = record.getTransactionId(); - * // process the record - * ... - * } catch (IOException ioe) { - * // handle the exception - * ... - * reader = dlm.getInputStream(nextTxId + 1); - * } - * } - * - * </pre> - * - * <h3>Read Records</h3> - * - * Reading records from an <i>endless</i> log in `synchronous` way isn't as - * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it - * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on - * controlling the <i>waiting</i> behavior on `synchronous` reads. - * - * <h4>Blocking vs NonBlocking</h4> - * - * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records - * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true) - * means the reads will only check readahead cache and return whatever records - * available in the readahead cache. - * <p> - * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is - * catching up with writer (there are records in the log), the read call will - * wait until records are read and returned. If the reader is caught up with - * writer (there are no more records in the log at read time), the read call - * will wait for a small period of time (defined in - * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever - * records available in the readahead cache. In other words, if a reader sees - * no record on blocking reads, it means the reader is `caught-up` with the - * writer. - * <p> - * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated - * state machines. Applications could use <i>blocking</i> reads till caught up - * with latest data. Once they are caught up with latest data, they could start - * serving their service and turn to <i>non-blocking</i> read mode and tail read - * data from the logs. - * <p> - * See examples below. - * - * <h4>Read Single Record</h4> - * - * {@link #readNext(boolean)} is reading individual records from a DL log. - * - * <pre> - * LogReader reader = ... - * - * // keep reading records in blocking way until no records available in the log - * LogRecord record = reader.readNext(false); - * while (null != record) { - * // process the record - * ... - * // read next record - * records = reader.readNext(false); - * } - * - * ... - * - * // reader is caught up with writer, doing non-blocking reads to tail the log - * while (true) { - * record = reader.readNext(true) - * // process the new records - * ... - * } - * </pre> - * - * <h4>Read Batch of Records</h4> - * - * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records - * from a DL log. - * - * <pre> - * LogReader reader = ... - * int N = 10; - * - * // keep reading N records in blocking way until no records available in the log - * List<LogRecord> records = reader.readBulk(false, N); - * while (!records.isEmpty()) { - * // process the list of records - * ... - * if (records.size() < N) { // no more records available in the log - * break; - * } - * // read next N records - * records = reader.readBulk(false, N); - * } - * - * ... - * - * // reader is caught up with writer, doing non-blocking reads to tail the log - * while (true) { - * records = reader.readBulk(true, N) - * // process the new records - * ... - * } - * - * </pre> - * - * @see AsyncLogReader - * - * NOTE: - * 1. Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing - * the {@link AsyncCloseable} interface so the reader could be closed asynchronously - */ -public interface LogReader extends Closeable, AsyncCloseable { - - /** - * Read the next log record from the stream. - * <p> - * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling - * records from read ahead cache. It would return <i>null</i> if there isn't any records - * available in the read ahead cache. - * <p> - * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will - * block until return a record if there are records in the stream (aka catching up). - * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()} - * milliseconds and return null if there isn't any more records in the stream. - * - * @param nonBlocking should the read make blocking calls to the backend or rely on the - * readAhead cache - * @return an operation from the stream or null if at end of stream - * @throws IOException if there is an error reading from the stream - */ - public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException; - - /** - * Read the next <i>numLogRecords</i> log records from the stream - * - * @param nonBlocking should the read make blocking calls to the backend or rely on the - * readAhead cache - * @param numLogRecords maximum number of log records returned by this call. - * @return an operation from the stream or empty list if at end of stream - * @throws IOException if there is an error reading from the stream - * @see #readNext(boolean) - */ - public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException; -}