http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java index 4a4d569..a9af651 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java @@ -20,10 +20,14 @@ import java.io.IOException; /** * An abstraction between an underlying input stream and record iterators, a LogInputStream - * returns only the shallow log entries, depending on {@link org.apache.kafka.common.record.RecordsIterator.DeepRecordsIterator} - * for the deep iteration. + * returns only the shallow log entries, depending on {@link RecordsIterator.DeepRecordsIterator} + * for the deep iteration. The generic typing allows for implementations which present only + * a view of the log entries, which enables more efficient iteration when the record data is + * not actually needed. See for example {@link org.apache.kafka.common.record.FileLogInputStream.FileChannelLogEntry} + * in which the record is not brought into memory until needed. + * @param <T> Type parameter of the log entry */ -interface LogInputStream { +interface LogInputStream<T extends LogEntry> { /** * Get the next log entry from the underlying input stream. @@ -31,5 +35,5 @@ interface LogInputStream { * @return The next log entry or null if there is none * @throws IOException for any IO errors */ - LogEntry nextEntry() throws IOException; + T nextEntry() throws IOException; }
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 65ccf98..b945062 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -12,197 +12,185 @@ */ package org.apache.kafka.common.record; -import java.io.DataInputStream; +import org.apache.kafka.common.record.ByteBufferLogInputStream.ByteBufferLogEntry; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; +import java.util.List; /** - * A {@link Records} implementation backed by a ByteBuffer. + * A {@link Records} implementation backed by a ByteBuffer. This is used only for reading or + * modifying in-place an existing buffer of log entries. To create a new buffer see {@link MemoryRecordsBuilder}, + * or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType) builder} variants. */ -public class MemoryRecords implements Records { +public class MemoryRecords extends AbstractRecords { public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0)); - private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1; - - // the compressor used for appends-only - private final Compressor compressor; - - // the write limit for writable buffer, which may be smaller than the buffer capacity - private final int writeLimit; - - // the capacity of the initial buffer, which is only used for de-allocation of writable records - private final int initialCapacity; - // the underlying buffer used for read; while the records are still writable it is null private ByteBuffer buffer; - - // indicate if the memory records is writable or not (i.e. used for appends or read-only) - private boolean writable; + private int validBytes = -1; // Construct a writable memory records - private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) { - this.writable = writable; - this.writeLimit = writeLimit; - this.initialCapacity = buffer.capacity(); - if (this.writable) { - this.buffer = null; - this.compressor = new Compressor(buffer, type); - } else { - this.buffer = buffer; - this.compressor = null; - } - } - - public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit) { - return new MemoryRecords(buffer, type, true, writeLimit); - } - - public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) { - // use the buffer capacity as the default write limit - return emptyRecords(buffer, type, buffer.capacity()); + private MemoryRecords(ByteBuffer buffer) { + this.buffer = buffer; } - public static MemoryRecords readableRecords(ByteBuffer buffer) { - return new MemoryRecords(buffer, CompressionType.NONE, false, WRITE_LIMIT_FOR_READABLE_ONLY); + @Override + public int sizeInBytes() { + return buffer.limit(); } - /** - * Append the given record and offset to the buffer - */ - public void append(long offset, Record record) { - if (!writable) - throw new IllegalStateException("Memory records is not writable"); - - int size = record.size(); - compressor.putLong(offset); - compressor.putInt(size); - compressor.put(record.buffer()); - compressor.recordWritten(size + Records.LOG_OVERHEAD); - record.buffer().rewind(); + @Override + public long writeTo(GatheringByteChannel channel, long position, int length) throws IOException { + ByteBuffer dup = buffer.duplicate(); + int pos = (int) position; + dup.position(pos); + dup.limit(pos + length); + return channel.write(dup); } /** - * Append a new record and offset to the buffer - * @return crc of the record + * Write all records to the given channel (including partial records). + * @param channel The channel to write to + * @return The number of bytes written + * @throws IOException For any IO errors writing to the channel */ - public long append(long offset, long timestamp, byte[] key, byte[] value) { - if (!writable) - throw new IllegalStateException("Memory records is not writable"); - - int size = Record.recordSize(key, value); - compressor.putLong(offset); - compressor.putInt(size); - long crc = compressor.putRecord(timestamp, key, value); - compressor.recordWritten(size + Records.LOG_OVERHEAD); - return crc; + public int writeFullyTo(GatheringByteChannel channel) throws IOException { + buffer.mark(); + int written = 0; + while (written < sizeInBytes()) + written += channel.write(buffer); + buffer.reset(); + return written; } /** - * Check if we have room for a new record containing the given key/value pair - * - * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be - * accurate if compression is really used. When this happens, the following append may cause dynamic buffer - * re-allocation in the underlying byte buffer stream. - * - * There is an exceptional case when appending a single message whose size is larger than the batch size, the - * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case - * the checking should be based on the capacity of the initialized buffer rather than the write limit in order - * to accept this single record. + * The total number of bytes in this message set not including any partial, trailing messages. This + * may be smaller than what is returned by {@link #sizeInBytes()}. + * @return The number of valid bytes */ - public boolean hasRoomFor(byte[] key, byte[] value) { - if (!this.writable) - return false; + public int validBytes() { + if (validBytes >= 0) + return validBytes; - return this.compressor.numRecordsWritten() == 0 ? - this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) : - this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value); - } + int bytes = 0; + Iterator<ByteBufferLogEntry> iterator = shallowIterator(); + while (iterator.hasNext()) + bytes += iterator.next().sizeInBytes(); - public boolean isFull() { - return !this.writable || this.writeLimit <= this.compressor.estimatedBytesWritten(); + this.validBytes = bytes; + return bytes; } /** - * Close this batch for no more appends + * Filter the records into the provided ByteBuffer. + * @param filter The filter function + * @param buffer The byte buffer to write the filtered records to + * @return A FilterResult with a summary of the output (for metrics) */ - public void close() { - if (writable) { - // close the compressor to fill-in wrapper message metadata if necessary - compressor.close(); - - // flip the underlying buffer to be ready for reads - buffer = compressor.buffer(); - buffer.flip(); - - // reset the writable flag - writable = false; + public FilterResult filterTo(LogEntryFilter filter, ByteBuffer buffer) { + long maxTimestamp = Record.NO_TIMESTAMP; + long shallowOffsetOfMaxTimestamp = -1L; + int messagesRead = 0; + int bytesRead = 0; + int messagesRetained = 0; + int bytesRetained = 0; + + Iterator<ByteBufferLogEntry> shallowIterator = shallowIterator(); + while (shallowIterator.hasNext()) { + ByteBufferLogEntry shallowEntry = shallowIterator.next(); + bytesRead += shallowEntry.sizeInBytes(); + + // We use the absolute offset to decide whether to retain the message or not (this is handled by the + // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version + // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic + // of the inner messages. This will be fixed as we recopy the messages to the destination buffer. + + Record shallowRecord = shallowEntry.record(); + byte shallowMagic = shallowRecord.magic(); + boolean writeOriginalEntry = true; + List<LogEntry> retainedEntries = new ArrayList<>(); + + for (LogEntry deepEntry : shallowEntry) { + Record deepRecord = deepEntry.record(); + messagesRead += 1; + + if (filter.shouldRetain(deepEntry)) { + // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite + // the corrupted entry with correct data. + if (shallowMagic != deepRecord.magic()) + writeOriginalEntry = false; + + retainedEntries.add(deepEntry); + } else { + writeOriginalEntry = false; + } + } + + if (writeOriginalEntry) { + // There are no messages compacted out and no message format conversion, write the original message set back + shallowEntry.writeTo(buffer); + messagesRetained += retainedEntries.size(); + bytesRetained += shallowEntry.sizeInBytes(); + + if (shallowRecord.timestamp() > maxTimestamp) { + maxTimestamp = shallowRecord.timestamp(); + shallowOffsetOfMaxTimestamp = shallowEntry.offset(); + } + } else if (!retainedEntries.isEmpty()) { + ByteBuffer slice = buffer.slice(); + MemoryRecordsBuilder builder = builderWithEntries(slice, shallowRecord.timestampType(), shallowRecord.compressionType(), + shallowRecord.timestamp(), retainedEntries); + MemoryRecords records = builder.build(); + buffer.position(buffer.position() + slice.position()); + messagesRetained += retainedEntries.size(); + bytesRetained += records.sizeInBytes(); + + MemoryRecordsBuilder.RecordsInfo info = builder.info(); + if (info.maxTimestamp > maxTimestamp) { + maxTimestamp = info.maxTimestamp; + shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp; + } + } } + + return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxTimestamp, shallowOffsetOfMaxTimestamp); } /** - * The size of this record set + * Get the byte buffer that backs this instance for reading. */ - @Override - public int sizeInBytes() { - if (writable) { - return compressor.buffer().position(); - } else { - return buffer.limit(); - } + public ByteBuffer buffer() { + return buffer.duplicate(); } @Override - public long writeTo(GatheringByteChannel channel, long offset, int length) throws IOException { - ByteBuffer dup = buffer.duplicate(); - int position = (int) offset; - dup.position(position); - dup.limit(position + length); - return channel.write(dup); + public Iterator<ByteBufferLogEntry> shallowIterator() { + return RecordsIterator.shallowIterator(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE)); } - /** - * The compression rate of this record set - */ - public double compressionRate() { - if (compressor == null) - return 1.0; - else - return compressor.compressionRate(); + @Override + public Iterator<LogEntry> deepIterator() { + return deepIterator(false); } - /** - * Return the capacity of the initial buffer, for writable records - * it may be different from the current buffer's capacity - */ - public int initialCapacity() { - return this.initialCapacity; + public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic) { + return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE); } - /** - * Get the byte buffer that backs this records instance for reading - */ - public ByteBuffer buffer() { - if (writable) - throw new IllegalStateException("The memory records must not be writable any more before getting its underlying buffer"); - - return buffer.duplicate(); + public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) { + return new RecordsIterator(new ByteBufferLogInputStream(buffer.duplicate(), maxMessageSize), false, + ensureMatchingMagic, maxMessageSize); } @Override - public Iterator<LogEntry> iterator() { - ByteBuffer input = this.buffer.duplicate(); - if (writable) - // flip on a duplicate buffer for reading - input.flip(); - return new RecordsIterator(new ByteBufferLogInputStream(input), false); - } - - @Override public String toString() { - Iterator<LogEntry> iter = iterator(); + Iterator<LogEntry> iter = deepIterator(); StringBuilder builder = new StringBuilder(); builder.append('['); while (iter.hasNext()) { @@ -214,16 +202,13 @@ public class MemoryRecords implements Records { builder.append("record="); builder.append(entry.record()); builder.append(")"); + if (iter.hasNext()) + builder.append(", "); } builder.append(']'); return builder.toString(); } - /** Visible for testing */ - public boolean isWritable() { - return writable; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -232,7 +217,6 @@ public class MemoryRecords implements Records { MemoryRecords that = (MemoryRecords) o; return buffer.equals(that.buffer); - } @Override @@ -240,28 +224,153 @@ public class MemoryRecords implements Records { return buffer.hashCode(); } - private static class ByteBufferLogInputStream implements LogInputStream { - private final DataInputStream stream; - private final ByteBuffer buffer; + public interface LogEntryFilter { + boolean shouldRetain(LogEntry entry); + } - private ByteBufferLogInputStream(ByteBuffer buffer) { - this.stream = new DataInputStream(new ByteBufferInputStream(buffer)); - this.buffer = buffer; + public static class FilterResult { + public final int messagesRead; + public final int bytesRead; + public final int messagesRetained; + public final int bytesRetained; + public final long maxTimestamp; + public final long shallowOffsetOfMaxTimestamp; + + public FilterResult(int messagesRead, + int bytesRead, + int messagesRetained, + int bytesRetained, + long maxTimestamp, + long shallowOffsetOfMaxTimestamp) { + this.messagesRead = messagesRead; + this.bytesRead = bytesRead; + this.messagesRetained = messagesRetained; + this.bytesRetained = bytesRetained; + this.maxTimestamp = maxTimestamp; + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; } + } - public LogEntry nextEntry() throws IOException { - long offset = stream.readLong(); - int size = stream.readInt(); - if (size < 0) - throw new IllegalStateException("Record with size " + size); - - ByteBuffer slice = buffer.slice(); - int newPos = buffer.position() + size; - if (newPos > buffer.limit()) - return null; - buffer.position(newPos); - slice.limit(size); - return new LogEntry(offset, new Record(slice)); - } + public static MemoryRecordsBuilder builder(ByteBuffer buffer, + CompressionType compressionType, + TimestampType timestampType, + int writeLimit) { + return new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType, timestampType, 0L, System.currentTimeMillis(), writeLimit); + } + + public static MemoryRecordsBuilder builder(ByteBuffer buffer, + byte magic, + CompressionType compressionType, + TimestampType timestampType, + long baseOffset, + long logAppendTime) { + return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, buffer.capacity()); + } + + public static MemoryRecordsBuilder builder(ByteBuffer buffer, + CompressionType compressionType, + TimestampType timestampType) { + // use the buffer capacity as the default write limit + return builder(buffer, compressionType, timestampType, buffer.capacity()); + } + + public static MemoryRecordsBuilder builder(ByteBuffer buffer, + byte magic, + CompressionType compressionType, + TimestampType timestampType) { + return builder(buffer, magic, compressionType, timestampType, 0L); + } + + public static MemoryRecordsBuilder builder(ByteBuffer buffer, + byte magic, + CompressionType compressionType, + TimestampType timestampType, + long baseOffset) { + return builder(buffer, magic, compressionType, timestampType, baseOffset, System.currentTimeMillis()); + } + + public static MemoryRecords readableRecords(ByteBuffer buffer) { + return new MemoryRecords(buffer); + } + + public static MemoryRecords withLogEntries(CompressionType compressionType, List<LogEntry> entries) { + return withLogEntries(TimestampType.CREATE_TIME, compressionType, System.currentTimeMillis(), entries); + } + + public static MemoryRecords withLogEntries(LogEntry ... entries) { + return withLogEntries(CompressionType.NONE, Arrays.asList(entries)); + } + + public static MemoryRecords withRecords(CompressionType compressionType, long initialOffset, List<Record> records) { + return withRecords(initialOffset, TimestampType.CREATE_TIME, compressionType, System.currentTimeMillis(), records); } + + public static MemoryRecords withRecords(Record ... records) { + return withRecords(CompressionType.NONE, 0L, Arrays.asList(records)); + } + + public static MemoryRecords withRecords(long initialOffset, Record ... records) { + return withRecords(CompressionType.NONE, initialOffset, Arrays.asList(records)); + } + + public static MemoryRecords withRecords(CompressionType compressionType, Record ... records) { + return withRecords(compressionType, 0L, Arrays.asList(records)); + } + + public static MemoryRecords withRecords(TimestampType timestampType, CompressionType compressionType, Record ... records) { + return withRecords(0L, timestampType, compressionType, System.currentTimeMillis(), Arrays.asList(records)); + } + + public static MemoryRecords withRecords(long initialOffset, + TimestampType timestampType, + CompressionType compressionType, + long logAppendTime, + List<Record> records) { + return withLogEntries(timestampType, compressionType, logAppendTime, buildLogEntries(initialOffset, records)); + } + + private static MemoryRecords withLogEntries(TimestampType timestampType, + CompressionType compressionType, + long logAppendTime, + List<LogEntry> entries) { + if (entries.isEmpty()) + return MemoryRecords.EMPTY; + return builderWithEntries(timestampType, compressionType, logAppendTime, entries).build(); + } + + private static List<LogEntry> buildLogEntries(long initialOffset, List<Record> records) { + List<LogEntry> entries = new ArrayList<>(); + for (Record record : records) + entries.add(LogEntry.create(initialOffset++, record)); + return entries; + } + + public static MemoryRecordsBuilder builderWithEntries(TimestampType timestampType, + CompressionType compressionType, + long logAppendTime, + List<LogEntry> entries) { + ByteBuffer buffer = ByteBuffer.allocate(estimatedSize(compressionType, entries)); + return builderWithEntries(buffer, timestampType, compressionType, logAppendTime, entries); + } + + private static MemoryRecordsBuilder builderWithEntries(ByteBuffer buffer, + TimestampType timestampType, + CompressionType compressionType, + long logAppendTime, + List<LogEntry> entries) { + if (entries.isEmpty()) + throw new IllegalArgumentException(); + + LogEntry firstEntry = entries.iterator().next(); + long firstOffset = firstEntry.offset(); + byte magic = firstEntry.record().magic(); + + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, + firstOffset, logAppendTime); + for (LogEntry entry : entries) + builder.append(entry); + + return builder; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java new file mode 100644 index 0000000..b90a9e6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.KafkaException; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +/** + * This class is used to write new log data in memory, i.e. this is the write path for {@link MemoryRecords}. + * It transparently handles compression and exposes methods for appending new entries, possibly with message + * format conversion. + */ +public class MemoryRecordsBuilder { + + static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f; + static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; + static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024; + + private static final float[] TYPE_TO_RATE; + + static { + int maxTypeId = -1; + for (CompressionType type : CompressionType.values()) + maxTypeId = Math.max(maxTypeId, type.id); + TYPE_TO_RATE = new float[maxTypeId + 1]; + for (CompressionType type : CompressionType.values()) { + TYPE_TO_RATE[type.id] = type.rate; + } + } + + // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression + // caching constructors to avoid invoking of Class.forName method for each batch + private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() { + @Override + public Constructor get() throws ClassNotFoundException, NoSuchMethodException { + return Class.forName("org.xerial.snappy.SnappyOutputStream") + .getConstructor(OutputStream.class, Integer.TYPE); + } + }); + + private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() { + @Override + public Constructor get() throws ClassNotFoundException, NoSuchMethodException { + return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream") + .getConstructor(OutputStream.class, Boolean.TYPE); + } + }); + + private static MemoizingConstructorSupplier snappyInputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() { + @Override + public Constructor get() throws ClassNotFoundException, NoSuchMethodException { + return Class.forName("org.xerial.snappy.SnappyInputStream") + .getConstructor(InputStream.class); + } + }); + + private static MemoizingConstructorSupplier lz4InputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() { + @Override + public Constructor get() throws ClassNotFoundException, NoSuchMethodException { + return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream") + .getConstructor(InputStream.class, Boolean.TYPE); + } + }); + + private final TimestampType timestampType; + private final CompressionType compressionType; + private final DataOutputStream appendStream; + private final ByteBufferOutputStream bufferStream; + private final byte magic; + private final int initPos; + private final long baseOffset; + private final long logAppendTime; + private final int writeLimit; + private final int initialCapacity; + + private MemoryRecords builtRecords; + private long writtenUncompressed; + private long numRecords; + private float compressionRate; + private long maxTimestamp; + private long offsetOfMaxTimestamp; + private long lastOffset = -1; + + public MemoryRecordsBuilder(ByteBuffer buffer, + byte magic, + CompressionType compressionType, + TimestampType timestampType, + long baseOffset, + long logAppendTime, + int writeLimit) { + this.magic = magic; + this.timestampType = timestampType; + this.compressionType = compressionType; + this.baseOffset = baseOffset; + this.logAppendTime = logAppendTime; + this.initPos = buffer.position(); + this.numRecords = 0; + this.writtenUncompressed = 0; + this.compressionRate = 1; + this.maxTimestamp = Record.NO_TIMESTAMP; + this.writeLimit = writeLimit; + this.initialCapacity = buffer.capacity(); + + if (compressionType != CompressionType.NONE) { + // for compressed records, leave space for the header and the shallow message metadata + // and move the starting position to the value payload offset + buffer.position(initPos + Records.LOG_OVERHEAD + Record.recordOverhead(magic)); + } + + // create the stream + bufferStream = new ByteBufferOutputStream(buffer); + appendStream = wrapForOutput(bufferStream, compressionType, magic, COMPRESSION_DEFAULT_BUFFER_SIZE); + } + + public ByteBuffer buffer() { + return bufferStream.buffer(); + } + + public int initialCapacity() { + return initialCapacity; + } + + public double compressionRate() { + return compressionRate; + } + + /** + * Close this builder and return the resulting buffer. + * @return The built log buffer + */ + public MemoryRecords build() { + close(); + return builtRecords; + } + + /** + * Get the max timestamp and its offset. If the log append time is used, then the offset will + * be either the first offset in the set if no compression is used or the last offset otherwise. + * @return The max timestamp and its offset + */ + public RecordsInfo info() { + if (timestampType == TimestampType.LOG_APPEND_TIME) + return new RecordsInfo(logAppendTime, lastOffset); + else + return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset); + } + + public void close() { + if (builtRecords != null) + return; + + try { + appendStream.close(); + } catch (IOException e) { + throw new KafkaException(e); + } + + if (compressionType != CompressionType.NONE) + writerCompressedWrapperHeader(); + + ByteBuffer buffer = buffer().duplicate(); + buffer.flip(); + buffer.position(initPos); + builtRecords = MemoryRecords.readableRecords(buffer.slice()); + } + + private void writerCompressedWrapperHeader() { + ByteBuffer buffer = bufferStream.buffer(); + int pos = buffer.position(); + buffer.position(initPos); + + int wrapperSize = pos - initPos - Records.LOG_OVERHEAD; + int writtenCompressed = wrapperSize - Record.recordOverhead(magic); + LogEntry.writeHeader(buffer, lastOffset, wrapperSize); + + long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : maxTimestamp; + Record.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType); + + buffer.position(pos); + + // update the compression ratio + this.compressionRate = (float) writtenCompressed / this.writtenUncompressed; + TYPE_TO_RATE[compressionType.id] = TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_DAMPING_FACTOR + + compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR); + } + + /** + * Append a new record and offset to the buffer + * @param offset The absolute offset of the record in the log buffer + * @param timestamp The record timestamp + * @param key The record key + * @param value The record value + * @return crc of the record + */ + public long append(long offset, long timestamp, byte[] key, byte[] value) { + try { + if (lastOffset > 0 && offset <= lastOffset) + throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); + + int size = Record.recordSize(magic, key, value); + LogEntry.writeHeader(appendStream, toInnerOffset(offset), size); + + if (timestampType == TimestampType.LOG_APPEND_TIME) + timestamp = logAppendTime; + long crc = Record.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType); + recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD); + return crc; + } catch (IOException e) { + throw new KafkaException("I/O exception when writing to the append stream, closing", e); + } + } + + /** + * Add the record, converting to the desired magic value if necessary. + * @param offset The offset of the record + * @param record The record to add + */ + public void convertAndAppend(long offset, Record record) { + if (magic == record.magic()) { + append(offset, record); + return; + } + + if (lastOffset > 0 && offset <= lastOffset) + throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); + + try { + int size = record.convertedSize(magic); + LogEntry.writeHeader(appendStream, toInnerOffset(offset), size); + long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : record.timestamp(); + record.convertTo(appendStream, magic, timestamp, timestampType); + recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD); + } catch (IOException e) { + throw new KafkaException("I/O exception when writing to the append stream, closing", e); + } + } + + /** + * Add a record without doing offset/magic validation (this should only be used in testing). + * @param offset The offset of the record + * @param record The record to add + */ + public void appendUnchecked(long offset, Record record) { + try { + int size = record.sizeInBytes(); + LogEntry.writeHeader(appendStream, toInnerOffset(offset), size); + + ByteBuffer buffer = record.buffer().duplicate(); + appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit()); + + recordWritten(offset, record.timestamp(), size + Records.LOG_OVERHEAD); + } catch (IOException e) { + throw new KafkaException("I/O exception when writing to the append stream, closing", e); + } + } + + /** + * Append the given log entry. The entry's record must have a magic which matches the magic use to + * construct this builder and the offset must be greater than the last appended entry. + * @param entry The entry to append + */ + public void append(LogEntry entry) { + append(entry.offset(), entry.record()); + } + + /** + * Add a record with a given offset. The record must have a magic which matches the magic use to + * construct this builder and the offset must be greater than the last appended entry. + * @param offset The offset of the record + * @param record The record to add + */ + public void append(long offset, Record record) { + if (record.magic() != magic) + throw new IllegalArgumentException("Inner log entries must have matching magic values as the wrapper"); + if (lastOffset > 0 && offset <= lastOffset) + throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); + appendUnchecked(offset, record); + } + + private long toInnerOffset(long offset) { + // use relative offsets for compressed messages with magic v1 + if (magic > 0 && compressionType != CompressionType.NONE) + return offset - baseOffset; + return offset; + } + + private void recordWritten(long offset, long timestamp, int size) { + numRecords += 1; + writtenUncompressed += size; + lastOffset = offset; + + if (timestamp > maxTimestamp) { + maxTimestamp = timestamp; + offsetOfMaxTimestamp = offset; + } + } + + /** + * Get an estimate of the number of bytes written (based on the estimation factor hard-coded in {@link CompressionType}. + * @return The estimated number of bytes written + */ + private int estimatedBytesWritten() { + if (compressionType == CompressionType.NONE) { + return buffer().position(); + } else { + // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes + return (int) (writtenUncompressed * TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_ESTIMATION_FACTOR); + } + } + + /** + * Check if we have room for a new record containing the given key/value pair + * + * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be + * accurate if compression is really used. When this happens, the following append may cause dynamic buffer + * re-allocation in the underlying byte buffer stream. + * + * There is an exceptional case when appending a single message whose size is larger than the batch size, the + * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case + * the checking should be based on the capacity of the initialized buffer rather than the write limit in order + * to accept this single record. + */ + public boolean hasRoomFor(byte[] key, byte[] value) { + return !isFull() && (numRecords == 0 ? + this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(magic, key, value) : + this.writeLimit >= estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(magic, key, value)); + } + + public boolean isClosed() { + return builtRecords != null; + } + + public boolean isFull() { + return isClosed() || this.writeLimit <= estimatedBytesWritten(); + } + + public int sizeInBytes() { + return builtRecords != null ? builtRecords.sizeInBytes() : estimatedBytesWritten(); + } + + private static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, byte messageVersion, int bufferSize) { + try { + switch (type) { + case NONE: + return buffer; + case GZIP: + return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); + case SNAPPY: + try { + OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize); + return new DataOutputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + case LZ4: + try { + OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer, + messageVersion == Record.MAGIC_VALUE_V0); + return new DataOutputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + default: + throw new IllegalArgumentException("Unknown compression type: " + type); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + + public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) { + try { + switch (type) { + case NONE: + return buffer; + case GZIP: + return new DataInputStream(new GZIPInputStream(buffer)); + case SNAPPY: + try { + InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer); + return new DataInputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + case LZ4: + try { + InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer, + messageVersion == Record.MAGIC_VALUE_V0); + return new DataInputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + default: + throw new IllegalArgumentException("Unknown compression type: " + type); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + + private interface ConstructorSupplier { + Constructor get() throws ClassNotFoundException, NoSuchMethodException; + } + + // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier} + private static class MemoizingConstructorSupplier { + final ConstructorSupplier delegate; + transient volatile boolean initialized; + transient Constructor value; + + public MemoizingConstructorSupplier(ConstructorSupplier delegate) { + this.delegate = delegate; + } + + public Constructor get() throws NoSuchMethodException, ClassNotFoundException { + if (!initialized) { + synchronized (this) { + if (!initialized) { + value = delegate.get(); + initialized = true; + } + } + } + return value; + } + } + + public static class RecordsInfo { + public final long maxTimestamp; + public final long shallowOffsetOfMaxTimestamp; + + public RecordsInfo(long maxTimestamp, + long shallowOffsetOfMaxTimestamp) { + this.maxTimestamp = maxTimestamp; + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/Record.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 09cb80d..0c0fa3c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -16,11 +16,15 @@ */ package org.apache.kafka.common.record; -import java.nio.ByteBuffer; - +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.Crc32; import org.apache.kafka.common.utils.Utils; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.apache.kafka.common.utils.Utils.wrapNullable; /** * A record: a serialized key and value along with the associated CRC and other fields @@ -53,7 +57,12 @@ public final class Record { /** * The amount of overhead bytes in a record */ - public static final int RECORD_OVERHEAD = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; + public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; + + /** + * The amount of overhead bytes in a record + */ + public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; /** * The "magic" values @@ -80,11 +89,6 @@ public final class Record { public static final int TIMESTAMP_TYPE_ATTRIBUTE_OFFSET = 3; /** - * Compression code for uncompressed records - */ - public static final int NO_COMPRESSION = 0; - - /** * Timestamp value for records without a timestamp */ public static final long NO_TIMESTAMP = -1L; @@ -94,155 +98,20 @@ public final class Record { private final TimestampType wrapperRecordTimestampType; public Record(ByteBuffer buffer) { - this.buffer = buffer; - this.wrapperRecordTimestamp = null; - this.wrapperRecordTimestampType = null; + this(buffer, null, null); } - // Package private constructor for inner iteration. - Record(ByteBuffer buffer, Long wrapperRecordTimestamp, TimestampType wrapperRecordTimestampType) { + public Record(ByteBuffer buffer, Long wrapperRecordTimestamp, TimestampType wrapperRecordTimestampType) { this.buffer = buffer; this.wrapperRecordTimestamp = wrapperRecordTimestamp; this.wrapperRecordTimestampType = wrapperRecordTimestampType; } /** - * A constructor to create a LogRecord. If the record's compression type is not none, then - * its value payload should be already compressed with the specified type; the constructor - * would always write the value payload as is and will not do the compression itself. - * - * @param timestamp The timestamp of the record - * @param key The key of the record (null, if none) - * @param value The record value - * @param type The compression type used on the contents of the record (if any) - * @param valueOffset The offset into the payload array used to extract payload - * @param valueSize The size of the payload to use - */ - public Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { - this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, - value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset))); - write(this.buffer, timestamp, key, value, type, valueOffset, valueSize); - this.buffer.rewind(); - } - - public Record(long timestamp, byte[] key, byte[] value, CompressionType type) { - this(timestamp, key, value, type, 0, -1); - } - - public Record(long timestamp, byte[] value, CompressionType type) { - this(timestamp, null, value, type); - } - - public Record(long timestamp, byte[] key, byte[] value) { - this(timestamp, key, value, CompressionType.NONE); - } - - public Record(long timestamp, byte[] value) { - this(timestamp, null, value, CompressionType.NONE); - } - - // Write a record to the buffer, if the record's compression type is none, then - // its value payload should be already compressed with the specified type - public static void write(ByteBuffer buffer, long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { - // construct the compressor with compression type none since this function will not do any - //compression according to the input type, it will just write the record's payload as is - Compressor compressor = new Compressor(buffer, CompressionType.NONE); - try { - compressor.putRecord(timestamp, key, value, type, valueOffset, valueSize); - } finally { - compressor.close(); - } - } - - public static void write(Compressor compressor, long crc, byte attributes, long timestamp, byte[] key, byte[] value, int valueOffset, int valueSize) { - // write crc - compressor.putInt((int) (crc & 0xffffffffL)); - // write magic value - compressor.putByte(CURRENT_MAGIC_VALUE); - // write attributes - compressor.putByte(attributes); - // write timestamp - compressor.putLong(timestamp); - // write the key - if (key == null) { - compressor.putInt(-1); - } else { - compressor.putInt(key.length); - compressor.put(key, 0, key.length); - } - // write the value - if (value == null) { - compressor.putInt(-1); - } else { - int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); - compressor.putInt(size); - compressor.put(value, valueOffset, size); - } - } - - public static int recordSize(byte[] key, byte[] value) { - return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length); - } - - public static int recordSize(int keySize, int valueSize) { - return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize; - } - - public ByteBuffer buffer() { - return this.buffer; - } - - public static byte computeAttributes(CompressionType type) { - byte attributes = 0; - if (type.id > 0) - attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id)); - return attributes; - } - - /** - * Compute the checksum of the record from the record contents - */ - public static long computeChecksum(ByteBuffer buffer, int position, int size) { - Crc32 crc = new Crc32(); - crc.update(buffer.array(), buffer.arrayOffset() + position, size); - return crc.getValue(); - } - - /** - * Compute the checksum of the record from the attributes, key and value payloads - */ - public static long computeChecksum(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { - Crc32 crc = new Crc32(); - crc.update(CURRENT_MAGIC_VALUE); - byte attributes = 0; - if (type.id > 0) - attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id)); - crc.update(attributes); - crc.updateLong(timestamp); - // update for the key - if (key == null) { - crc.updateInt(-1); - } else { - crc.updateInt(key.length); - crc.update(key, 0, key.length); - } - // update for the value - if (value == null) { - crc.updateInt(-1); - } else { - int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); - crc.updateInt(size); - crc.update(value, valueOffset, size); - } - return crc.getValue(); - } - - - /** * Compute the checksum of the record from the record contents */ public long computeChecksum() { - return computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET); + return Utils.computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET); } /** @@ -256,7 +125,15 @@ public final class Record { * Returns true if the crc stored with the record matches the crc computed off the record contents */ public boolean isValid() { - return size() >= CRC_LENGTH && checksum() == computeChecksum(); + return sizeInBytes() >= CRC_LENGTH && checksum() == computeChecksum(); + } + + public Long wrapperRecordTimestamp() { + return wrapperRecordTimestamp; + } + + public TimestampType wrapperRecordTimestampType() { + return wrapperRecordTimestampType; } /** @@ -264,9 +141,9 @@ public final class Record { */ public void ensureValid() { if (!isValid()) { - if (size() < CRC_LENGTH) + if (sizeInBytes() < CRC_LENGTH) throw new InvalidRecordException("Record is corrupt (crc could not be retrieved as the record is too " - + "small, size = " + size() + ")"); + + "small, size = " + sizeInBytes() + ")"); else throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum() + ", computed crc = " + computeChecksum() + ")"); @@ -274,14 +151,17 @@ public final class Record { } /** - * The complete serialized size of this record in bytes (including crc, header attributes, etc) + * The complete serialized size of this record in bytes (including crc, header attributes, etc), but + * excluding the log overhead (offset and record size). + * @return the size in bytes */ - public int size() { + public int sizeInBytes() { return buffer.limit(); } /** * The length of the key in bytes + * @return the size in bytes of the key (0 if the key is null) */ public int keySize() { if (magic() == MAGIC_VALUE_V0) @@ -292,6 +172,7 @@ public final class Record { /** * Does the record have a key? + * @return true if so, false otherwise */ public boolean hasKey() { return keySize() >= 0; @@ -309,13 +190,23 @@ public final class Record { /** * The length of the value in bytes + * @return the size in bytes of the value (0 if the value is null) */ public int valueSize() { return buffer.getInt(valueSizeOffset()); } /** - * The magic version of this record + * Check whether the value field of this record is null. + * @return true if the value is null, false otherwise + */ + public boolean hasNullValue() { + return valueSize() < 0; + } + + /** + * The magic value (i.e. message format version) of this record + * @return the magic value */ public byte magic() { return buffer.get(MAGIC_OFFSET); @@ -323,6 +214,7 @@ public final class Record { /** * The attributes stored with this record + * @return the attributes */ public byte attributes() { return buffer.get(ATTRIBUTES_OFFSET); @@ -333,6 +225,8 @@ public final class Record { * 1. wrapperRecordTimestampType = null and wrapperRecordTimestamp is null - Uncompressed message, timestamp is in the message. * 2. wrapperRecordTimestampType = LOG_APPEND_TIME and WrapperRecordTimestamp is not null - Compressed message using LOG_APPEND_TIME * 3. wrapperRecordTimestampType = CREATE_TIME and wrapperRecordTimestamp is not null - Compressed message using CREATE_TIME + * + * @return the timestamp as determined above */ public long timestamp() { if (magic() == MAGIC_VALUE_V0) @@ -349,6 +243,8 @@ public final class Record { /** * The timestamp of the message. + * @return the timstamp type or {@link TimestampType#NO_TIMESTAMP_TYPE} if the magic is 0 or the message has + * been up-converted. */ public TimestampType timestampType() { if (magic() == 0) @@ -366,36 +262,30 @@ public final class Record { /** * A ByteBuffer containing the value of this record + * @return the value or null if the value for this record is null */ public ByteBuffer value() { - return sliceDelimited(valueSizeOffset()); + return Utils.sizeDelimited(buffer, valueSizeOffset()); } /** * A ByteBuffer containing the message key + * @return the buffer or null if the key for this record is null */ public ByteBuffer key() { if (magic() == MAGIC_VALUE_V0) - return sliceDelimited(KEY_SIZE_OFFSET_V0); + return Utils.sizeDelimited(buffer, KEY_SIZE_OFFSET_V0); else - return sliceDelimited(KEY_SIZE_OFFSET_V1); + return Utils.sizeDelimited(buffer, KEY_SIZE_OFFSET_V1); } /** - * Read a size-delimited byte buffer starting at the given offset + * Get the underlying buffer backing this record instance. + * + * @return the buffer */ - private ByteBuffer sliceDelimited(int start) { - int size = buffer.getInt(start); - if (size < 0) { - return null; - } else { - ByteBuffer b = buffer.duplicate(); - b.position(start + 4); - b = b.slice(); - b.limit(size); - b.rewind(); - return b; - } + public ByteBuffer buffer() { + return this.buffer; } public String toString() { @@ -434,4 +324,316 @@ public final class Record { return buffer.hashCode(); } + /** + * Get the size of this record if converted to the given format. + * + * @param toMagic The target magic version to convert to + * @return The size in bytes after conversion + */ + public int convertedSize(byte toMagic) { + return recordSize(toMagic, Math.max(0, keySize()), Math.max(0, valueSize())); + } + + /** + * Convert this record to another message format. + * + * @param toMagic The target magic version to convert to + * @return A new record instance with a freshly allocated ByteBuffer. + */ + public Record convert(byte toMagic) { + if (toMagic == magic()) + return this; + + ByteBuffer buffer = ByteBuffer.allocate(convertedSize(toMagic)); + TimestampType timestampType = wrapperRecordTimestampType != null ? + wrapperRecordTimestampType : TimestampType.forAttributes(attributes()); + convertTo(buffer, toMagic, timestamp(), timestampType); + buffer.rewind(); + return new Record(buffer); + } + + private void convertTo(ByteBuffer buffer, byte toMagic, long timestamp, TimestampType timestampType) { + if (compressionType() != CompressionType.NONE) + throw new IllegalArgumentException("Cannot use convertTo for deep conversion"); + + write(buffer, toMagic, timestamp, key(), value(), CompressionType.NONE, timestampType); + } + + /** + * Convert this record to another message format and write the converted data to the provided outputs stream. + * + * @param out The output stream to write the converted data to + * @param toMagic The target magic version for conversion + * @param timestamp The timestamp to use in the converted record (for up-conversion) + * @param timestampType The timestamp type to use in the converted record (for up-conversion) + * @throws IOException for any IO errors writing the converted record. + */ + public void convertTo(DataOutputStream out, byte toMagic, long timestamp, TimestampType timestampType) throws IOException { + if (compressionType() != CompressionType.NONE) + throw new IllegalArgumentException("Cannot use convertTo for deep conversion"); + + write(out, toMagic, timestamp, key(), value(), CompressionType.NONE, timestampType); + } + + /** + * Create a new record instance. If the record's compression type is not none, then + * its value payload should be already compressed with the specified type; the constructor + * would always write the value payload as is and will not do the compression itself. + * + * @param magic The magic value to use + * @param timestamp The timestamp of the record + * @param key The key of the record (null, if none) + * @param value The record value + * @param compressionType The compression type used on the contents of the record (if any) + * @param timestampType The timestamp type to be used for this record + */ + public static Record create(byte magic, + long timestamp, + byte[] key, + byte[] value, + CompressionType compressionType, + TimestampType timestampType) { + int keySize = key == null ? 0 : key.length; + int valueSize = value == null ? 0 : value.length; + ByteBuffer buffer = ByteBuffer.allocate(recordSize(magic, keySize, valueSize)); + write(buffer, magic, timestamp, wrapNullable(key), wrapNullable(value), compressionType, timestampType); + buffer.rewind(); + return new Record(buffer); + } + + public static Record create(long timestamp, byte[] key, byte[] value) { + return create(CURRENT_MAGIC_VALUE, timestamp, key, value, CompressionType.NONE, TimestampType.CREATE_TIME); + } + + public static Record create(byte magic, long timestamp, byte[] key, byte[] value) { + return create(magic, timestamp, key, value, CompressionType.NONE, TimestampType.CREATE_TIME); + } + + public static Record create(byte magic, TimestampType timestampType, long timestamp, byte[] key, byte[] value) { + return create(magic, timestamp, key, value, CompressionType.NONE, timestampType); + } + + public static Record create(byte magic, long timestamp, byte[] value) { + return create(magic, timestamp, null, value, CompressionType.NONE, TimestampType.CREATE_TIME); + } + + public static Record create(byte magic, byte[] key, byte[] value) { + return create(magic, NO_TIMESTAMP, key, value); + } + + public static Record create(byte[] key, byte[] value) { + return create(NO_TIMESTAMP, key, value); + } + + public static Record create(byte[] value) { + return create(CURRENT_MAGIC_VALUE, NO_TIMESTAMP, null, value, CompressionType.NONE, TimestampType.CREATE_TIME); + } + + /** + * Write the header for a compressed record set in-place (i.e. assuming the compressed record data has already + * been written at the value offset in a wrapped record). This lets you dynamically create a compressed message + * set, and then go back later and fill in its size and CRC, which saves the need for copying to another buffer. + * + * @param buffer The buffer containing the compressed record data positioned at the first offset of the + * @param magic The magic value of the record set + * @param recordSize The size of the record (including record overhead) + * @param timestamp The timestamp of the wrapper record + * @param compressionType The compression type used + * @param timestampType The timestamp type of the wrapper record + */ + public static void writeCompressedRecordHeader(ByteBuffer buffer, + byte magic, + int recordSize, + long timestamp, + CompressionType compressionType, + TimestampType timestampType) { + int recordPosition = buffer.position(); + int valueSize = recordSize - recordOverhead(magic); + + // write the record header with a null value (the key is always null for the wrapper) + write(buffer, magic, timestamp, null, null, compressionType, timestampType); + + // now fill in the value size + buffer.putInt(recordPosition + keyOffset(magic), valueSize); + + // compute and fill the crc from the beginning of the message + long crc = Utils.computeChecksum(buffer, recordPosition + MAGIC_OFFSET, recordSize - MAGIC_OFFSET); + Utils.writeUnsignedInt(buffer, recordPosition + CRC_OFFSET, crc); + } + + private static void write(ByteBuffer buffer, + byte magic, + long timestamp, + ByteBuffer key, + ByteBuffer value, + CompressionType compressionType, + TimestampType timestampType) { + try { + ByteBufferOutputStream out = new ByteBufferOutputStream(buffer); + write(out, magic, timestamp, key, value, compressionType, timestampType); + } catch (IOException e) { + throw new KafkaException(e); + } + } + + /** + * Write the record data with the given compression type and return the computed crc. + * + * @param out The output stream to write to + * @param magic The magic value to be used + * @param timestamp The timestamp of the record + * @param key The record key + * @param value The record value + * @param compressionType The compression type + * @param timestampType The timestamp type + * @return the computed CRC for this record. + * @throws IOException for any IO errors writing to the output stream. + */ + public static long write(DataOutputStream out, + byte magic, + long timestamp, + byte[] key, + byte[] value, + CompressionType compressionType, + TimestampType timestampType) throws IOException { + return write(out, magic, timestamp, wrapNullable(key), wrapNullable(value), compressionType, timestampType); + } + + private static long write(DataOutputStream out, + byte magic, + long timestamp, + ByteBuffer key, + ByteBuffer value, + CompressionType compressionType, + TimestampType timestampType) throws IOException { + byte attributes = computeAttributes(magic, compressionType, timestampType); + long crc = computeChecksum(magic, attributes, timestamp, key, value); + write(out, magic, crc, attributes, timestamp, key, value); + return crc; + } + + + /** + * Write a record using raw fields (without validation). This should only be used in testing. + */ + public static void write(DataOutputStream out, + byte magic, + long crc, + byte attributes, + long timestamp, + byte[] key, + byte[] value) throws IOException { + write(out, magic, crc, attributes, timestamp, wrapNullable(key), wrapNullable(value)); + } + + // Write a record to the buffer, if the record's compression type is none, then + // its value payload should be already compressed with the specified type + private static void write(DataOutputStream out, + byte magic, + long crc, + byte attributes, + long timestamp, + ByteBuffer key, + ByteBuffer value) throws IOException { + if (magic != MAGIC_VALUE_V0 && magic != MAGIC_VALUE_V1) + throw new IllegalArgumentException("Invalid magic value " + magic); + if (timestamp < 0 && timestamp != NO_TIMESTAMP) + throw new IllegalArgumentException("Invalid message timestamp " + timestamp); + + // write crc + out.writeInt((int) (crc & 0xffffffffL)); + // write magic value + out.writeByte(magic); + // write attributes + out.writeByte(attributes); + + // maybe write timestamp + if (magic > 0) + out.writeLong(timestamp); + + // write the key + if (key == null) { + out.writeInt(-1); + } else { + int size = key.remaining(); + out.writeInt(size); + out.write(key.array(), key.arrayOffset(), size); + } + // write the value + if (value == null) { + out.writeInt(-1); + } else { + int size = value.remaining(); + out.writeInt(size); + out.write(value.array(), value.arrayOffset(), size); + } + } + + public static int recordSize(byte[] key, byte[] value) { + return recordSize(CURRENT_MAGIC_VALUE, key, value); + } + + public static int recordSize(byte magic, byte[] key, byte[] value) { + return recordSize(magic, key == null ? 0 : key.length, value == null ? 0 : value.length); + } + + private static int recordSize(byte magic, int keySize, int valueSize) { + return recordOverhead(magic) + keySize + valueSize; + } + + // visible only for testing + public static byte computeAttributes(byte magic, CompressionType type, TimestampType timestampType) { + byte attributes = 0; + if (type.id > 0) + attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id)); + if (magic > 0) + return timestampType.updateAttributes(attributes); + return attributes; + } + + // visible only for testing + public static long computeChecksum(byte magic, byte attributes, long timestamp, byte[] key, byte[] value) { + return computeChecksum(magic, attributes, timestamp, wrapNullable(key), wrapNullable(value)); + } + + /** + * Compute the checksum of the record from the attributes, key and value payloads + */ + private static long computeChecksum(byte magic, byte attributes, long timestamp, ByteBuffer key, ByteBuffer value) { + Crc32 crc = new Crc32(); + crc.update(magic); + crc.update(attributes); + if (magic > 0) + crc.updateLong(timestamp); + // update for the key + if (key == null) { + crc.updateInt(-1); + } else { + int size = key.remaining(); + crc.updateInt(size); + crc.update(key.array(), key.arrayOffset(), size); + } + // update for the value + if (value == null) { + crc.updateInt(-1); + } else { + int size = value.remaining(); + crc.updateInt(size); + crc.update(value.array(), value.arrayOffset(), size); + } + return crc.getValue(); + } + + public static int recordOverhead(byte magic) { + if (magic == 0) + return RECORD_OVERHEAD_V0; + return RECORD_OVERHEAD_V1; + } + + private static int keyOffset(byte magic) { + if (magic == 0) + return KEY_OFFSET_V0; + return KEY_OFFSET_V1; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/Records.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java index 3bc043f..823d2b7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Records.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java @@ -18,32 +18,74 @@ package org.apache.kafka.common.record; import java.io.IOException; import java.nio.channels.GatheringByteChannel; +import java.util.Iterator; /** - * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords} - * for the in-memory representation. + * Interface for accessing the records contained in a log. The log itself is represented as a sequence of log entries. + * Each log entry consists of an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}. + * If the entry is not compressed, then each entry will have only the shallow record contained inside it. If it is + * compressed, the entry contains "deep" records, which are packed into the value field of the shallow record. To iterate + * over the shallow records, use {@link #shallowIterator()}; for the deep records, use {@link #deepIterator()}. Note + * that the deep iterator handles both compressed and non-compressed entries: if the entry is not compressed, the + * shallow record is returned; otherwise, the shallow record is decompressed and the deep entries are returned. + * See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation. */ -public interface Records extends Iterable<LogEntry> { +public interface Records { - int SIZE_LENGTH = 4; + int OFFSET_OFFSET = 0; int OFFSET_LENGTH = 8; - int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH; + int SIZE_OFFSET = OFFSET_OFFSET + OFFSET_LENGTH; + int SIZE_LENGTH = 4; + int LOG_OVERHEAD = SIZE_OFFSET + SIZE_LENGTH; /** - * The size of these records in bytes - * @return The size in bytes + * The size of these records in bytes. + * @return The size in bytes of the records */ int sizeInBytes(); /** - * Write the messages in this set to the given channel starting at the given offset byte. + * Write the contents of this buffer to a channel. * @param channel The channel to write to - * @param position The position within this record set to begin writing from + * @param position The position in the buffer to write from * @param length The number of bytes to write - * @return The number of bytes written to the channel (which may be fewer than requested) - * @throws IOException For any IO errors copying the + * @return The number of bytes written + * @throws IOException For any IO errors */ long writeTo(GatheringByteChannel channel, long position, int length) throws IOException; + /** + * Get the shallow log entries in this log buffer. Note that the signature allows subclasses + * to return a more specific log entry type. This enables optimizations such as in-place offset + * assignment (see {@link ByteBufferLogInputStream.ByteBufferLogEntry}), and partial reading of + * record data (see {@link FileLogInputStream.FileChannelLogEntry#magic()}. + * @return An iterator over the shallow entries of the log + */ + Iterator<? extends LogEntry> shallowIterator(); + + /** + * Get the deep log entries (i.e. descend into compressed message sets). For the deep records, + * there are fewer options for optimization since the data must be decompressed before it can be + * returned. Hence there is little advantage in allowing subclasses to return a more specific type + * as we do for {@link #shallowIterator()}. + * @return An iterator over the deep entries of the log + */ + Iterator<LogEntry> deepIterator(); + + /** + * Check whether all shallow entries in this buffer have a certain magic value. + * @param magic The magic value to check + * @return true if all shallow entries have a matching magic value, false otherwise + */ + boolean hasMatchingShallowMagic(byte magic); + + + /** + * Convert all entries in this buffer to the format passed as a parameter. Note that this requires + * deep iteration since all of the deep records must also be converted to the desired format. + * @param toMagic The magic value to convert to + * @return A Records (which may or may not be the same instance) + */ + Records toMessageFormat(byte toMagic); }