KAFKA-4390; Replace MessageSet usage with client-side alternatives Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com>, Jun Rao <jun...@gmail.com> Closes #2140 from hachikuji/KAFKA4390 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/67f1e5b9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/67f1e5b9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/67f1e5b9 Branch: refs/heads/trunk Commit: 67f1e5b91bf073151ff57d5d656693e385726697 Parents: 6626b05 Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Dec 13 10:26:25 2016 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Dec 13 10:26:25 2016 -0800 ---------------------------------------------------------------------- checkstyle/import-control.xml | 1 + .../clients/consumer/internals/Fetcher.java | 7 +- .../clients/producer/internals/BufferPool.java | 2 +- .../producer/internals/RecordAccumulator.java | 30 +- .../clients/producer/internals/RecordBatch.java | 58 +- .../clients/producer/internals/Sender.java | 10 +- .../kafka/common/record/AbstractRecords.java | 92 +++ .../common/record/ByteBufferInputStream.java | 37 +- .../common/record/ByteBufferLogInputStream.java | 119 ++++ .../common/record/ByteBufferOutputStream.java | 54 +- .../kafka/common/record/CompressionType.java | 2 +- .../apache/kafka/common/record/Compressor.java | 332 ---------- .../kafka/common/record/FileLogInputStream.java | 166 +++++ .../apache/kafka/common/record/FileRecords.java | 465 ++++++++++++-- .../common/record/InvalidRecordException.java | 4 +- .../apache/kafka/common/record/LogEntry.java | 151 ++++- .../kafka/common/record/LogInputStream.java | 12 +- .../kafka/common/record/MemoryRecords.java | 443 +++++++++----- .../common/record/MemoryRecordsBuilder.java | 461 ++++++++++++++ .../org/apache/kafka/common/record/Record.java | 538 +++++++++++----- .../org/apache/kafka/common/record/Records.java | 64 +- .../kafka/common/record/RecordsIterator.java | 179 ++++-- .../kafka/common/record/TimestampType.java | 1 + .../org/apache/kafka/common/utils/Utils.java | 51 ++ .../clients/consumer/KafkaConsumerTest.java | 7 +- .../clients/consumer/internals/FetcherTest.java | 100 +-- .../internals/RecordAccumulatorTest.java | 51 +- .../record/ByteBufferLogInputStreamTest.java | 110 ++++ .../kafka/common/record/FileRecordsTest.java | 410 +++++++++++++ .../common/record/MemoryRecordsBuilderTest.java | 253 ++++++++ .../kafka/common/record/MemoryRecordsTest.java | 194 +++++- .../apache/kafka/common/record/RecordTest.java | 46 +- .../kafka/common/record/SimpleRecordTest.java | 52 +- .../kafka/common/record/TimestampTypeTest.java | 37 ++ .../java/org/apache/kafka/test/TestUtils.java | 32 +- core/src/main/scala/kafka/api/ApiVersion.scala | 20 +- .../main/scala/kafka/cluster/Partition.scala | 10 +- .../kafka/consumer/ConsumerFetcherThread.scala | 10 +- .../kafka/coordinator/GroupCoordinator.scala | 6 +- .../coordinator/GroupMetadataManager.scala | 170 +++-- .../main/scala/kafka/log/FileMessageSet.scala | 445 -------------- core/src/main/scala/kafka/log/Log.scala | 128 ++-- core/src/main/scala/kafka/log/LogCleaner.scala | 55 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- core/src/main/scala/kafka/log/LogSegment.scala | 98 +-- .../src/main/scala/kafka/log/LogValidator.scala | 239 ++++++++ core/src/main/scala/kafka/log/TimeIndex.scala | 6 +- .../kafka/message/ByteBufferMessageSet.scala | 613 +------------------ core/src/main/scala/kafka/message/Message.scala | 64 +- .../scala/kafka/message/MessageAndOffset.scala | 14 +- .../main/scala/kafka/message/MessageSet.scala | 8 +- .../kafka/server/AbstractFetcherThread.scala | 10 +- .../main/scala/kafka/server/DelayedFetch.scala | 5 +- .../main/scala/kafka/server/FetchDataInfo.scala | 7 +- .../src/main/scala/kafka/server/KafkaApis.scala | 32 +- .../kafka/server/ReplicaFetcherThread.scala | 22 +- .../scala/kafka/server/ReplicaManager.scala | 85 +-- .../scala/kafka/tools/DumpLogSegments.scala | 150 ++--- .../kafka/tools/ReplicaVerificationTool.scala | 52 +- .../api/GroupCoordinatorIntegrationTest.scala | 6 +- .../scala/kafka/tools/TestLogCleaning.scala | 19 +- .../test/scala/other/kafka/StressTestLog.scala | 22 +- .../other/kafka/TestLinearWriteSpeed.scala | 19 +- .../unit/kafka/admin/DeleteTopicTest.scala | 2 +- .../GroupCoordinatorResponseTest.scala | 30 +- .../coordinator/GroupMetadataManagerTest.scala | 100 ++- .../unit/kafka/log/BrokerCompressionTest.scala | 15 +- .../unit/kafka/log/FileMessageSetTest.scala | 354 ----------- .../kafka/log/LogCleanerIntegrationTest.scala | 66 +- .../log/LogCleanerLagIntegrationTest.scala | 24 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 29 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 275 ++++----- .../scala/unit/kafka/log/LogManagerTest.scala | 22 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 136 ++-- .../src/test/scala/unit/kafka/log/LogTest.scala | 204 +++--- .../scala/unit/kafka/log/LogValidatorTest.scala | 395 ++++++++++++ .../kafka/message/BaseMessageSetTestCases.scala | 20 +- .../message/ByteBufferMessageSetTest.scala | 348 ----------- .../kafka/message/MessageCompressionTest.scala | 2 +- .../scala/unit/kafka/message/MessageTest.scala | 38 +- .../server/AbstractFetcherThreadTest.scala | 32 +- .../unit/kafka/server/FetchRequestTest.scala | 53 +- .../unit/kafka/server/ISRExpirationTest.scala | 27 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 14 +- .../unit/kafka/server/ProduceRequestTest.scala | 8 +- .../kafka/server/ReplicaManagerQuotasTest.scala | 33 +- .../unit/kafka/server/ReplicaManagerTest.scala | 28 +- .../unit/kafka/server/SimpleFetchTest.scala | 21 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 19 +- 89 files changed, 5269 insertions(+), 3914 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 8eebdb5..62cd77a 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -89,6 +89,7 @@ <allow pkg="net.jpountz" /> <allow pkg="org.apache.kafka.common.record" /> <allow pkg="org.apache.kafka.common.network" /> + <allow pkg="org.apache.kafka.common.errors" /> </subpackage> <subpackage name="requests"> http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 6fb4229..3b9d49c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -61,6 +61,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; @@ -686,11 +687,13 @@ public class Fetcher<K, V> { } List<ConsumerRecord<K, V>> parsed = new ArrayList<>(); - for (LogEntry logEntry : partition.records) { + Iterator<LogEntry> deepIterator = partition.records.deepIterator(); + while (deepIterator.hasNext()) { + LogEntry logEntry = deepIterator.next(); // Skip the messages earlier than current position. if (logEntry.offset() >= position) { parsed.add(parseRecord(tp, logEntry)); - bytes += logEntry.size(); + bytes += logEntry.sizeInBytes(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index b42b0ec..077215c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -198,7 +198,7 @@ public final class BufferPool { * memory as free. * * @param buffer The buffer to return - * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity + * @param size The size of the buffer to mark as deallocated, note that this may be smaller than buffer.capacity * since the buffer may re-allocate itself during in-place compression */ public void deallocate(ByteBuffer buffer, int size) { http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index fa1e513..06d39ec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -27,8 +27,10 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -49,7 +51,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** - * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} + * This class acts as a queue that accumulates records into {@link MemoryRecords} * instances to be sent to the server. * <p> * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless @@ -77,7 +79,7 @@ public final class RecordAccumulator { /** * Create a new record accumulator * - * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances + * @param batchSize The size to use when allocating {@link MemoryRecords} instances * @param totalSize The maximum memory the record accumulator can use. * @param compression The compression codec for the records * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for @@ -190,13 +192,13 @@ public final class RecordAccumulator { free.deallocate(buffer); return appendResult; } - MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); - RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); + MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize); + RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); - return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); + return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { appendsInProgress.decrementAndGet(); @@ -212,9 +214,9 @@ public final class RecordAccumulator { if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); if (future == null) - last.records.close(); + last.close(); else - return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false); + return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false); } return null; } @@ -240,7 +242,7 @@ public final class RecordAccumulator { Iterator<RecordBatch> batchIterator = dq.iterator(); while (batchIterator.hasNext()) { RecordBatch batch = batchIterator.next(); - boolean isFull = batch != lastBatch || batch.records.isFull(); + boolean isFull = batch != lastBatch || batch.isFull(); // check if the batch is expired if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) { expiredBatches.add(batch); @@ -319,7 +321,7 @@ public final class RecordAccumulator { long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); - boolean full = deque.size() > 1 || batch.records.isFull(); + boolean full = deque.size() > 1 || batch.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { @@ -389,15 +391,15 @@ public final class RecordAccumulator { boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; // Only drain the batch if it is not during backoff period. if (!backoff) { - if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { + if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due // to compression; in this case we will still eventually send this batch in a single // request break; } else { RecordBatch batch = deque.pollFirst(); - batch.records.close(); - size += batch.records.sizeInBytes(); + batch.close(); + size += batch.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } @@ -437,7 +439,7 @@ public final class RecordAccumulator { */ public void deallocate(RecordBatch batch) { incomplete.remove(batch); - free.deallocate(batch.records.buffer(), batch.records.initialCapacity()); + free.deallocate(batch.buffer(), batch.initialCapacity()); } /** @@ -507,7 +509,7 @@ public final class RecordAccumulator { Deque<RecordBatch> dq = getDeque(batch.topicPartition); // Close the batch before aborting synchronized (dq) { - batch.records.close(); + batch.close(); dq.remove(batch); } batch.done(-1L, Record.NO_TIMESTAMP, new IllegalStateException("Producer is closed forcefully.")); http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 6706bfd..e9ef441 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -12,18 +12,20 @@ */ package org.apache.kafka.clients.producer.internals; -import java.util.ArrayList; -import java.util.List; - import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + /** * A batch of records that is or will be sent. * @@ -39,21 +41,21 @@ public final class RecordBatch { public final long createdMs; public long drainedMs; public long lastAttemptMs; - public final MemoryRecords records; public final TopicPartition topicPartition; public final ProduceRequestResult produceFuture; public long lastAppendTime; private final List<Thunk> thunks; private long offsetCounter = 0L; private boolean retry; + private final MemoryRecordsBuilder recordsBuilder; - public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { + public RecordBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) { this.createdMs = now; this.lastAttemptMs = now; - this.records = records; + this.recordsBuilder = recordsBuilder; this.topicPartition = tp; this.produceFuture = new ProduceRequestResult(); - this.thunks = new ArrayList<Thunk>(); + this.thunks = new ArrayList<>(); this.lastAppendTime = createdMs; this.retry = false; } @@ -64,10 +66,10 @@ public final class RecordBatch { * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) { - if (!this.records.hasRoomFor(key, value)) { + if (!recordsBuilder.hasRoomFor(key, value)) { return null; } else { - long checksum = this.records.append(offsetCounter++, timestamp, key, value); + long checksum = this.recordsBuilder.append(offsetCounter++, timestamp, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, @@ -94,9 +96,8 @@ public final class RecordBatch { baseOffset, exception); // execute callbacks - for (int i = 0; i < this.thunks.size(); i++) { + for (Thunk thunk : thunks) { try { - Thunk thunk = this.thunks.get(i); if (exception == null) { // If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used. RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset(), @@ -156,7 +157,7 @@ public final class RecordBatch { } if (expire) { - this.records.close(); + close(); this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + " due to " + errorMessage)); } @@ -177,4 +178,37 @@ public final class RecordBatch { public void setRetry() { this.retry = true; } + + public MemoryRecords records() { + return recordsBuilder.build(); + } + + public int sizeInBytes() { + return recordsBuilder.sizeInBytes(); + } + + public double compressionRate() { + return recordsBuilder.compressionRate(); + } + + public boolean isFull() { + return recordsBuilder.isFull(); + } + + public void close() { + recordsBuilder.close(); + } + + public ByteBuffer buffer() { + return recordsBuilder.buffer(); + } + + public int initialCapacity() { + return recordsBuilder.initialCapacity(); + } + + public boolean isWritable() { + return !recordsBuilder.isClosed(); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7555b71..1f54c0b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -345,7 +345,7 @@ public class Sender implements Runnable { final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; - produceRecordsByPartition.put(tp, batch.records); + produceRecordsByPartition.put(tp, batch.records()); recordsByPartition.put(tp, batch); } @@ -505,17 +505,17 @@ public class Sender implements Runnable { // per-topic bytes send rate String topicByteRateName = "topic." + topic + ".bytes"; Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName)); - topicByteRate.record(batch.records.sizeInBytes()); + topicByteRate.record(batch.sizeInBytes()); // per-topic compression rate String topicCompressionRateName = "topic." + topic + ".compression-rate"; Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName)); - topicCompressionRate.record(batch.records.compressionRate()); + topicCompressionRate.record(batch.compressionRate()); // global metrics - this.batchSizeSensor.record(batch.records.sizeInBytes(), now); + this.batchSizeSensor.record(batch.sizeInBytes(), now); this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now); - this.compressionRateSensor.record(batch.records.compressionRate()); + this.compressionRateSensor.record(batch.compressionRate()); this.maxRecordSizeSensor.record(batch.maxRecordSize, now); records += batch.recordCount; } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java new file mode 100644 index 0000000..3794dc6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.utils.AbstractIterator; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public abstract class AbstractRecords implements Records { + + @Override + public boolean hasMatchingShallowMagic(byte magic) { + Iterator<? extends LogEntry> iterator = shallowIterator(); + while (iterator.hasNext()) + if (iterator.next().magic() != magic) + return false; + return true; + } + + /** + * Convert this message set to use the specified message format. + */ + @Override + public Records toMessageFormat(byte toMagic) { + List<LogEntry> converted = new ArrayList<>(); + Iterator<LogEntry> deepIterator = deepIterator(); + while (deepIterator.hasNext()) { + LogEntry entry = deepIterator.next(); + converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic))); + } + + if (converted.isEmpty()) { + // This indicates that the message is too large, which indicates that the buffer is not large + // enough to hold a full log entry. We just return all the bytes in the file message set. + // Even though the message set does not have the right format version, we expect old clients + // to raise an error to the user after reading the message size and seeing that there + // are not enough available bytes in the response to read the full message. + return this; + } else { + // We use the first message to determine the compression type for the resulting message set. + // This could result in message sets which are either larger or smaller than the original size. + // For example, it could end up larger if most messages were previously compressed, but + // it just so happens that the first one is not. There is also some risk that this can + // cause some timestamp information to be lost (e.g. if the timestamp type was changed) since + // we are essentially merging multiple message sets. However, currently this method is only + // used for down-conversion, so we've ignored the problem. + CompressionType compressionType = shallowIterator().next().record().compressionType(); + return MemoryRecords.withLogEntries(compressionType, converted); + } + } + + public static int estimatedSize(CompressionType compressionType, Iterable<LogEntry> entries) { + int size = 0; + for (LogEntry entry : entries) + size += entry.sizeInBytes(); + // NOTE: 1024 is the minimum block size for snappy encoding + return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16); + } + + /** + * Get an iterator over the deep records. + * @return An iterator over the records + */ + public Iterator<Record> records() { + return new AbstractIterator<Record>() { + private final Iterator<? extends LogEntry> deepEntries = deepIterator(); + @Override + protected Record makeNext() { + if (deepEntries.hasNext()) + return deepEntries.next().record(); + return allDone(); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java index 84668a5..b25f949 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java @@ -16,34 +16,41 @@ */ package org.apache.kafka.common.record; +import java.io.DataInputStream; import java.io.InputStream; import java.nio.ByteBuffer; /** * A byte buffer backed input inputStream */ -public class ByteBufferInputStream extends InputStream { - - private ByteBuffer buffer; +public class ByteBufferInputStream extends DataInputStream { public ByteBufferInputStream(ByteBuffer buffer) { - this.buffer = buffer; + super(new UnderlyingInputStream(buffer)); } - public int read() { - if (!buffer.hasRemaining()) { - return -1; + private static class UnderlyingInputStream extends InputStream { + private ByteBuffer buffer; + + public UnderlyingInputStream(ByteBuffer buffer) { + this.buffer = buffer; } - return buffer.get() & 0xFF; - } - public int read(byte[] bytes, int off, int len) { - if (!buffer.hasRemaining()) { - return -1; + public int read() { + if (!buffer.hasRemaining()) { + return -1; + } + return buffer.get() & 0xFF; } - len = Math.min(len, buffer.remaining()); - buffer.get(bytes, off, len); - return len; + public int read(byte[] bytes, int off, int len) { + if (!buffer.hasRemaining()) { + return -1; + } + + len = Math.min(len, buffer.remaining()); + buffer.get(bytes, off, len); + return len; + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java new file mode 100644 index 0000000..ae0c91b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; +import static org.apache.kafka.common.record.Records.OFFSET_OFFSET; + +/** + * A byte buffer backed log input stream. This class avoids the need to copy records by returning + * slices from the underlying byte buffer. + */ +class ByteBufferLogInputStream implements LogInputStream<ByteBufferLogInputStream.ByteBufferLogEntry> { + private final ByteBuffer buffer; + private final int maxMessageSize; + + ByteBufferLogInputStream(ByteBuffer buffer, int maxMessageSize) { + this.buffer = buffer; + this.maxMessageSize = maxMessageSize; + } + + public ByteBufferLogEntry nextEntry() throws IOException { + int remaining = buffer.remaining(); + if (remaining < LOG_OVERHEAD) + return null; + + int recordSize = buffer.getInt(buffer.position() + Records.SIZE_OFFSET); + if (recordSize < Record.RECORD_OVERHEAD_V0) + throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0)); + if (recordSize > maxMessageSize) + throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize)); + + int entrySize = recordSize + LOG_OVERHEAD; + if (remaining < entrySize) + return null; + + ByteBuffer entrySlice = buffer.slice(); + entrySlice.limit(entrySize); + buffer.position(buffer.position() + entrySize); + return new ByteBufferLogEntry(entrySlice); + } + + public static class ByteBufferLogEntry extends LogEntry { + private final ByteBuffer buffer; + private final Record record; + + private ByteBufferLogEntry(ByteBuffer buffer) { + this.buffer = buffer; + buffer.position(LOG_OVERHEAD); + this.record = new Record(buffer.slice()); + buffer.position(OFFSET_OFFSET); + } + + @Override + public long offset() { + return buffer.getLong(OFFSET_OFFSET); + } + + @Override + public Record record() { + return record; + } + + public void setOffset(long offset) { + buffer.putLong(OFFSET_OFFSET, offset); + } + + public void setCreateTime(long timestamp) { + if (record.magic() == Record.MAGIC_VALUE_V0) + throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0"); + + long currentTimestamp = record.timestamp(); + // We don't need to recompute crc if the timestamp is not updated. + if (record.timestampType() == TimestampType.CREATE_TIME && currentTimestamp == timestamp) + return; + + byte attributes = record.attributes(); + buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, TimestampType.CREATE_TIME.updateAttributes(attributes)); + buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp); + long crc = record.computeChecksum(); + Utils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc); + } + + public void setLogAppendTime(long timestamp) { + if (record.magic() == Record.MAGIC_VALUE_V0) + throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0"); + + byte attributes = record.attributes(); + buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, TimestampType.LOG_APPEND_TIME.updateAttributes(attributes)); + buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp); + long crc = record.computeChecksum(); + Utils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc); + } + + public ByteBuffer buffer() { + return buffer; + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java index 1c9fbaa..3fb7f49 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java @@ -16,42 +16,54 @@ */ package org.apache.kafka.common.record; +import java.io.DataOutputStream; import java.io.OutputStream; import java.nio.ByteBuffer; /** * A byte buffer backed output outputStream */ -public class ByteBufferOutputStream extends OutputStream { +public class ByteBufferOutputStream extends DataOutputStream { private static final float REALLOCATION_FACTOR = 1.1f; - private ByteBuffer buffer; - public ByteBufferOutputStream(ByteBuffer buffer) { - this.buffer = buffer; + super(new UnderlyingOutputStream(buffer)); } - public void write(int b) { - if (buffer.remaining() < 1) - expandBuffer(buffer.capacity() + 1); - buffer.put((byte) b); + public ByteBuffer buffer() { + return ((UnderlyingOutputStream) out).buffer; } - public void write(byte[] bytes, int off, int len) { - if (buffer.remaining() < len) - expandBuffer(buffer.capacity() + len); - buffer.put(bytes, off, len); - } + public static class UnderlyingOutputStream extends OutputStream { + private ByteBuffer buffer; - public ByteBuffer buffer() { - return buffer; - } + public UnderlyingOutputStream(ByteBuffer buffer) { + this.buffer = buffer; + } - private void expandBuffer(int size) { - int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size); - ByteBuffer temp = ByteBuffer.allocate(expandSize); - temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); - buffer = temp; + public void write(int b) { + if (buffer.remaining() < 1) + expandBuffer(buffer.capacity() + 1); + buffer.put((byte) b); + } + + public void write(byte[] bytes, int off, int len) { + if (buffer.remaining() < len) + expandBuffer(buffer.capacity() + len); + buffer.put(bytes, off, len); + } + + public ByteBuffer buffer() { + return buffer; + } + + private void expandBuffer(int size) { + int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size); + ByteBuffer temp = ByteBuffer.allocate(expandSize); + temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); + buffer = temp; + } } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 65a7e43..e1d4754 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -26,7 +26,7 @@ public enum CompressionType { public final String name; public final float rate; - private CompressionType(int id, String name, float rate) { + CompressionType(int id, String name, float rate) { this.id = id; this.name = name; this.rate = rate; http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/Compressor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java deleted file mode 100644 index a806975..0000000 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ /dev/null @@ -1,332 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.record; - -import java.lang.reflect.Constructor; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.utils.Utils; - -import java.io.InputStream; -import java.io.OutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -public class Compressor { - - static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f; - static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; - static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024; - - private static final float[] TYPE_TO_RATE; - - static { - int maxTypeId = -1; - for (CompressionType type : CompressionType.values()) - maxTypeId = Math.max(maxTypeId, type.id); - TYPE_TO_RATE = new float[maxTypeId + 1]; - for (CompressionType type : CompressionType.values()) { - TYPE_TO_RATE[type.id] = type.rate; - } - } - - // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression - // caching constructors to avoid invoking of Class.forName method for each batch - private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() { - @Override - public Constructor get() throws ClassNotFoundException, NoSuchMethodException { - return Class.forName("org.xerial.snappy.SnappyOutputStream") - .getConstructor(OutputStream.class, Integer.TYPE); - } - }); - - private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() { - @Override - public Constructor get() throws ClassNotFoundException, NoSuchMethodException { - return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream") - .getConstructor(OutputStream.class); - } - }); - - private static MemoizingConstructorSupplier snappyInputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() { - @Override - public Constructor get() throws ClassNotFoundException, NoSuchMethodException { - return Class.forName("org.xerial.snappy.SnappyInputStream") - .getConstructor(InputStream.class); - } - }); - - private static MemoizingConstructorSupplier lz4InputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() { - @Override - public Constructor get() throws ClassNotFoundException, NoSuchMethodException { - return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream") - .getConstructor(InputStream.class, Boolean.TYPE); - } - }); - - private final CompressionType type; - private final DataOutputStream appendStream; - private final ByteBufferOutputStream bufferStream; - private final int initPos; - - public long writtenUncompressed; - public long numRecords; - public float compressionRate; - public long maxTimestamp; - - public Compressor(ByteBuffer buffer, CompressionType type) { - this.type = type; - this.initPos = buffer.position(); - - this.numRecords = 0; - this.writtenUncompressed = 0; - this.compressionRate = 1; - this.maxTimestamp = Record.NO_TIMESTAMP; - - if (type != CompressionType.NONE) { - // for compressed records, leave space for the header and the shallow message metadata - // and move the starting position to the value payload offset - buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD); - } - - // create the stream - bufferStream = new ByteBufferOutputStream(buffer); - appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE); - } - - public ByteBuffer buffer() { - return bufferStream.buffer(); - } - - public double compressionRate() { - return compressionRate; - } - - public void close() { - try { - appendStream.close(); - } catch (IOException e) { - throw new KafkaException(e); - } - - if (type != CompressionType.NONE) { - ByteBuffer buffer = bufferStream.buffer(); - int pos = buffer.position(); - // write the header, for the end offset write as number of records - 1 - buffer.position(initPos); - buffer.putLong(numRecords - 1); - buffer.putInt(pos - initPos - Records.LOG_OVERHEAD); - // write the shallow message (the crc and value size are not correct yet) - Record.write(buffer, maxTimestamp, null, null, type, 0, -1); - // compute the fill the value size - int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD; - buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET_V1, valueSize); - // compute and fill the crc at the beginning of the message - long crc = Record.computeChecksum(buffer, - initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET, - pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET); - Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc); - // reset the position - buffer.position(pos); - - // update the compression ratio - this.compressionRate = (float) buffer.position() / this.writtenUncompressed; - TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR + - compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR); - } - } - - // Note that for all the write operations below, IO exceptions should - // never be thrown since the underlying ByteBufferOutputStream does not throw IOException; - // therefore upon encountering this issue we just close the append stream. - - public void putLong(final long value) { - try { - appendStream.writeLong(value); - } catch (IOException e) { - throw new KafkaException("I/O exception when writing to the append stream, closing", e); - } - } - - public void putInt(final int value) { - try { - appendStream.writeInt(value); - } catch (IOException e) { - throw new KafkaException("I/O exception when writing to the append stream, closing", e); - } - } - - public void put(final ByteBuffer buffer) { - try { - appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit()); - } catch (IOException e) { - throw new KafkaException("I/O exception when writing to the append stream, closing", e); - } - } - - public void putByte(final byte value) { - try { - appendStream.write(value); - } catch (IOException e) { - throw new KafkaException("I/O exception when writing to the append stream, closing", e); - } - } - - public void put(final byte[] bytes, final int offset, final int len) { - try { - appendStream.write(bytes, offset, len); - } catch (IOException e) { - throw new KafkaException("I/O exception when writing to the append stream, closing", e); - } - } - - /** - * @return CRC of the record - */ - public long putRecord(long timestamp, byte[] key, byte[] value, CompressionType type, - int valueOffset, int valueSize) { - // put a record as un-compressed into the underlying stream - long crc = Record.computeChecksum(timestamp, key, value, type, valueOffset, valueSize); - byte attributes = Record.computeAttributes(type); - putRecord(crc, attributes, timestamp, key, value, valueOffset, valueSize); - return crc; - } - - /** - * Put a record as uncompressed into the underlying stream - * @return CRC of the record - */ - public long putRecord(long timestamp, byte[] key, byte[] value) { - return putRecord(timestamp, key, value, CompressionType.NONE, 0, -1); - } - - private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) { - maxTimestamp = Math.max(maxTimestamp, timestamp); - Record.write(this, crc, attributes, timestamp, key, value, valueOffset, valueSize); - } - - public void recordWritten(int size) { - numRecords += 1; - writtenUncompressed += size; - } - - public long numRecordsWritten() { - return numRecords; - } - - public long estimatedBytesWritten() { - if (type == CompressionType.NONE) { - return bufferStream.buffer().position(); - } else { - // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes - return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR); - } - } - - // the following two functions also need to be public since they are used in MemoryRecords.iteration - - public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { - try { - switch (type) { - case NONE: - return new DataOutputStream(buffer); - case GZIP: - return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); - case SNAPPY: - try { - OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize); - return new DataOutputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } - case LZ4: - try { - OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer); - return new DataOutputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } - default: - throw new IllegalArgumentException("Unknown compression type: " + type); - } - } catch (IOException e) { - throw new KafkaException(e); - } - } - - public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) { - try { - switch (type) { - case NONE: - return new DataInputStream(buffer); - case GZIP: - return new DataInputStream(new GZIPInputStream(buffer)); - case SNAPPY: - try { - InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer); - return new DataInputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } - case LZ4: - try { - InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer, - messageVersion == Record.MAGIC_VALUE_V0); - return new DataInputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } - default: - throw new IllegalArgumentException("Unknown compression type: " + type); - } - } catch (IOException e) { - throw new KafkaException(e); - } - } - - private interface ConstructorSupplier { - Constructor get() throws ClassNotFoundException, NoSuchMethodException; - } - - // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier} - private static class MemoizingConstructorSupplier { - final ConstructorSupplier delegate; - transient volatile boolean initialized; - transient Constructor value; - - public MemoizingConstructorSupplier(ConstructorSupplier delegate) { - this.delegate = delegate; - } - - public Constructor get() throws NoSuchMethodException, ClassNotFoundException { - if (!initialized) { - synchronized (this) { - if (!initialized) { - Constructor constructor = delegate.get(); - value = constructor; - initialized = true; - return constructor; - } - } - } - return value; - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java new file mode 100644 index 0000000..ae393b0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.CorruptRecordException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +/** + * A log input stream which is backed by a {@link FileChannel}. + */ +public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelLogEntry> { + private int position; + private final int end; + private final FileChannel channel; + private final int maxRecordSize; + private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD); + + /** + * Create a new log input stream over the FileChannel + * @param channel Underlying FileChannel + * @param maxRecordSize Maximum size of records + * @param start Position in the file channel to start from + * @param end Position in the file channel not to read past + */ + public FileLogInputStream(FileChannel channel, + int maxRecordSize, + int start, + int end) { + this.channel = channel; + this.maxRecordSize = maxRecordSize; + this.position = start; + this.end = end; + } + + @Override + public FileChannelLogEntry nextEntry() throws IOException { + if (position + Records.LOG_OVERHEAD >= end) + return null; + + logHeaderBuffer.rewind(); + channel.read(logHeaderBuffer, position); + if (logHeaderBuffer.hasRemaining()) + return null; + + logHeaderBuffer.rewind(); + long offset = logHeaderBuffer.getLong(); + int size = logHeaderBuffer.getInt(); + + if (size < Record.RECORD_OVERHEAD_V0) + throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", Record.RECORD_OVERHEAD_V0)); + + if (size > maxRecordSize) + throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxRecordSize)); + + if (position + Records.LOG_OVERHEAD + size > end) + return null; + + FileChannelLogEntry logEntry = new FileChannelLogEntry(offset, channel, position, size); + position += logEntry.sizeInBytes(); + return logEntry; + } + + /** + * Log entry backed by an underlying FileChannel. This allows iteration over the shallow log + * entries without needing to read the record data into memory until it is needed. The downside + * is that entries will generally no longer be readable when the underlying channel is closed. + */ + public static class FileChannelLogEntry extends LogEntry { + private final long offset; + private final FileChannel channel; + private final int position; + private final int recordSize; + private Record record = null; + + private FileChannelLogEntry(long offset, + FileChannel channel, + int position, + int recordSize) { + this.offset = offset; + this.channel = channel; + this.position = position; + this.recordSize = recordSize; + } + + @Override + public long offset() { + return offset; + } + + public int position() { + return position; + } + + @Override + public byte magic() { + if (record != null) + return record.magic(); + + try { + byte[] magic = new byte[1]; + ByteBuffer buf = ByteBuffer.wrap(magic); + channel.read(buf, position + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET); + if (buf.hasRemaining()) + throw new KafkaException("Failed to read magic byte from FileChannel " + channel); + return magic[0]; + } catch (IOException e) { + throw new KafkaException(e); + } + } + + /** + * Force load the record and its data (key and value) into memory. + * @return The resulting record + * @throws IOException for any IO errors reading from the underlying file + */ + private Record loadRecord() throws IOException { + if (record != null) + return record; + + ByteBuffer recordBuffer = ByteBuffer.allocate(recordSize); + channel.read(recordBuffer, position + Records.LOG_OVERHEAD); + if (recordBuffer.hasRemaining()) + throw new IOException("Failed to read full record from channel " + channel); + + recordBuffer.rewind(); + record = new Record(recordBuffer); + return record; + } + + @Override + public Record record() { + if (record != null) + return record; + + try { + return loadRecord(); + } catch (IOException e) { + throw new KafkaException(e); + } + } + + @Override + public int sizeInBytes() { + return Records.LOG_OVERHEAD + recordSize; + } + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index bdae08d..faf61e9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -18,22 +18,31 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.network.TransportLayer; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelLogEntry; +import org.apache.kafka.common.utils.Utils; +import java.io.Closeable; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.GatheringByteChannel; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; /** - * File-backed record set. + * A {@link Records} implementation backed by a file. An optional start and end position can be applied to this + * instance to enable slicing a range of the log records. */ -public class FileRecords implements Records { - private final File file; +public class FileRecords extends AbstractRecords implements Closeable { + private final boolean isSlice; private final FileChannel channel; - private final long start; - private final long end; - private final long size; + private final int start; + private final int end; + private volatile File file; + private final AtomicInteger size; public FileRecords(File file, FileChannel channel, @@ -44,83 +53,435 @@ public class FileRecords implements Records { this.channel = channel; this.start = start; this.end = end; + this.isSlice = isSlice; + this.size = new AtomicInteger(); - if (isSlice) - this.size = end - start; - else - this.size = Math.min(channel.size(), end) - start; + // set the initial size of the buffer + resize(); + } + + public void resize() throws IOException { + if (isSlice) { + size.set(end - start); + } else { + int limit = Math.min((int) channel.size(), end); + size.set(limit - start); + + // if this is not a slice, update the file pointer to the end of the file + // set the file position to the last byte in the file + channel.position(limit); + } } @Override public int sizeInBytes() { - return (int) size; + return size.get(); + } + + /** + * Get the underlying file. + * @return The file + */ + public File file() { + return file; + } + + /** + * Get the underlying file channel. + * @return The file channel + */ + public FileChannel channel() { + return channel; + } + + /** + * Read log entries into a given buffer. + * @param buffer The buffer to write the entries to + * @param position Position in the buffer to read from + * @return The same buffer + * @throws IOException + */ + public ByteBuffer readInto(ByteBuffer buffer, int position) throws IOException { + channel.read(buffer, position + this.start); + buffer.flip(); + return buffer; + } + + /** + * Return a slice of records from this instance, which is a view into this set starting from the given position + * and with the given size limit. + * + * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read. + * + * If this message set is already sliced, the position will be taken relative to that slicing. + * + * @param position The start position to begin the read from + * @param size The number of bytes after the start position to include + * @return A sliced wrapper on this message set limited based on the given position and size + */ + public FileRecords read(int position, int size) throws IOException { + if (position < 0) + throw new IllegalArgumentException("Invalid position: " + position); + if (size < 0) + throw new IllegalArgumentException("Invalid size: " + size); + + final int end; + if (this.start + position + size < 0) + end = sizeInBytes(); + else + end = Math.min(this.start + position + size, sizeInBytes()); + return new FileRecords(file, channel, this.start + position, end, true); + } + + /** + * Append log entries to the buffer + * @param records The records to append + * @return the number of bytes written to the underlying file + */ + public int append(MemoryRecords records) throws IOException { + int written = records.writeFullyTo(channel); + size.getAndAdd(written); + return written; + } + + /** + * Commit all written data to the physical disk + */ + public void flush() throws IOException { + channel.force(true); + } + + /** + * Close this record set + */ + public void close() throws IOException { + flush(); + trim(); + channel.close(); + } + + /** + * Delete this message set from the filesystem + * @return True iff this message set was deleted. + */ + public boolean delete() { + Utils.closeQuietly(channel, "FileChannel"); + return file.delete(); + } + + /** + * Trim file when close or roll to next file + */ + public void trim() throws IOException { + truncateTo(sizeInBytes()); + } + + /** + * Update the file reference (to be used with caution since this does not reopen the file channel) + * @param file The new file to use + */ + public void setFile(File file) { + this.file = file; + } + + /** + * Rename the file that backs this message set + * @throws IOException if rename fails. + */ + public void renameTo(File f) throws IOException { + try { + Utils.atomicMoveWithFallback(file.toPath(), f.toPath()); + } finally { + this.file = f; + } + } + + /** + * Truncate this file message set to the given size in bytes. Note that this API does no checking that the + * given size falls on a valid message boundary. + * In some versions of the JDK truncating to the same size as the file message set will cause an + * update of the files mtime, so truncate is only performed if the targetSize is smaller than the + * size of the underlying FileChannel. + * It is expected that no other threads will do writes to the log when this function is called. + * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes. + * @return The number of bytes truncated off + */ + public int truncateTo(int targetSize) throws IOException { + int originalSize = sizeInBytes(); + if (targetSize > originalSize || targetSize < 0) + throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " + + " size of this log segment is " + originalSize + " bytes."); + if (targetSize < (int) channel.size()) { + channel.truncate(targetSize); + channel.position(targetSize); + size.set(targetSize); + } + return originalSize - targetSize; } @Override public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException { long newSize = Math.min(channel.size(), end) - start; - if (newSize < size) + if (newSize < size.get()) throw new KafkaException(String.format("Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), size, newSize)); - if (offset > size) - throw new KafkaException(String.format("The requested offset %d is out of range. The size of this FileRecords is %d.", offset, size)); - long position = start + offset; - long count = Math.min(length, this.size - offset); + long count = Math.min(length, size.get()); + final long bytesTransferred; if (destChannel instanceof TransportLayer) { TransportLayer tl = (TransportLayer) destChannel; - return tl.transferFrom(this.channel, position, count); + bytesTransferred = tl.transferFrom(channel, position, count); } else { - return this.channel.transferTo(position, count, destChannel); + bytesTransferred = channel.transferTo(position, count, destChannel); + } + return bytesTransferred; + } + + /** + * Search forward for the file position of the last offset that is greater than or equal to the target offset + * and return its physical position and the size of the message (including log overhead) at the returned offset. If + * no such offsets are found, return null. + * + * @param targetOffset The offset to search for. + * @param startingPosition The starting position in the file to begin searching from. + */ + public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) { + Iterator<FileChannelLogEntry> iterator = shallowIteratorFrom(Integer.MAX_VALUE, startingPosition); + while (iterator.hasNext()) { + FileChannelLogEntry entry = iterator.next(); + long offset = entry.offset(); + if (offset >= targetOffset) + return new LogEntryPosition(offset, entry.position(), entry.sizeInBytes()); + } + return null; + } + + /** + * Search forward for the message whose timestamp is greater than or equals to the target timestamp. + * + * @param targetTimestamp The timestamp to search for. + * @param startingPosition The starting position to search. + * @return The timestamp and offset of the message found. None, if no message is found. + */ + public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition) { + Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition); + while (shallowIterator.hasNext()) { + LogEntry shallowEntry = shallowIterator.next(); + Record shallowRecord = shallowEntry.record(); + if (shallowRecord.timestamp() >= targetTimestamp) { + // We found a message + for (LogEntry deepLogEntry : shallowEntry) { + long timestamp = deepLogEntry.record().timestamp(); + if (timestamp >= targetTimestamp) + return new TimestampAndOffset(timestamp, deepLogEntry.offset()); + } + throw new IllegalStateException(String.format("The message set (max timestamp = %s, max offset = %s" + + " should contain target timestamp %s, but does not.", shallowRecord.timestamp(), + shallowEntry.offset(), targetTimestamp)); + } + } + return null; + } + + /** + * Return the largest timestamp of the messages after a given position in this file message set. + * @param startingPosition The starting position. + * @return The largest timestamp of the messages after the given position. + */ + public TimestampAndOffset largestTimestampAfter(int startingPosition) { + long maxTimestamp = Record.NO_TIMESTAMP; + long offsetOfMaxTimestamp = -1L; + + Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition); + while (shallowIterator.hasNext()) { + LogEntry shallowEntry = shallowIterator.next(); + long timestamp = shallowEntry.record().timestamp(); + if (timestamp > maxTimestamp) { + maxTimestamp = timestamp; + offsetOfMaxTimestamp = shallowEntry.offset(); + } } + return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp); + } + + /** + * Get an iterator over the shallow entries in the file. Note that the entries are + * backed by the open file channel. When the channel is closed (i.e. when this instance + * is closed), the entries will generally no longer be readable. + * @return An iterator over the shallow entries + */ + @Override + public Iterator<FileChannelLogEntry> shallowIterator() { + return shallowIteratorFrom(start); + } + + /** + * Get an iterator over the shallow entries, enforcing a maximum record size + * @param maxRecordSize The maximum allowable size of individual records (including compressed record sets) + * @return An iterator over the shallow entries + */ + public Iterator<FileChannelLogEntry> shallowIterator(int maxRecordSize) { + return shallowIteratorFrom(maxRecordSize, start); + } + + private Iterator<FileChannelLogEntry> shallowIteratorFrom(int start) { + return shallowIteratorFrom(Integer.MAX_VALUE, start); + } + + private Iterator<FileChannelLogEntry> shallowIteratorFrom(int maxRecordSize, int start) { + final int end; + if (isSlice) + end = this.end; + else + end = this.sizeInBytes(); + FileLogInputStream inputStream = new FileLogInputStream(channel, maxRecordSize, start, end); + return RecordsIterator.shallowIterator(inputStream); } @Override - public RecordsIterator iterator() { - return new RecordsIterator(new FileLogInputStream(channel, start, end), false); + public Iterator<LogEntry> deepIterator() { + final int end; + if (isSlice) + end = this.end; + else + end = this.sizeInBytes(); + FileLogInputStream inputStream = new FileLogInputStream(channel, Integer.MAX_VALUE, start, end); + return new RecordsIterator(inputStream, false, false, Integer.MAX_VALUE); + } + + public static FileRecords open(File file, + boolean mutable, + boolean fileAlreadyExists, + int initFileSize, + boolean preallocate) throws IOException { + FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); + int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; + return new FileRecords(file, channel, 0, end, false); + } + + public static FileRecords open(File file, + boolean fileAlreadyExists, + int initFileSize, + boolean preallocate) throws IOException { + return open(file, true, fileAlreadyExists, initFileSize, preallocate); + } + + public static FileRecords open(File file, boolean mutable) throws IOException { + return open(file, mutable, false, 0, false); + } + + public static FileRecords open(File file) throws IOException { + return open(file, true); + } + + /** + * Open a channel for the given file + * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize + * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance. + * @param file File path + * @param mutable mutable + * @param fileAlreadyExists File already exists or not + * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024 + * @param preallocate Pre allocate file or not, gotten from configuration. + */ + private static FileChannel openChannel(File file, + boolean mutable, + boolean fileAlreadyExists, + int initFileSize, + boolean preallocate) throws IOException { + if (mutable) { + if (fileAlreadyExists) { + return new RandomAccessFile(file, "rw").getChannel(); + } else { + if (preallocate) { + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); + randomAccessFile.setLength(initFileSize); + return randomAccessFile.getChannel(); + } else { + return new RandomAccessFile(file, "rw").getChannel(); + } + } + } else { + return new FileInputStream(file).getChannel(); + } } - private static class FileLogInputStream implements LogInputStream { - private long position; - protected final long end; - protected final FileChannel channel; - private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD); + public static class LogEntryPosition { + public final long offset; + public final int position; + public final int size; - public FileLogInputStream(FileChannel channel, long start, long end) { - this.channel = channel; - this.position = start; - this.end = end; + public LogEntryPosition(long offset, int position, int size) { + this.offset = offset; + this.position = position; + this.size = size; } @Override - public LogEntry nextEntry() throws IOException { - if (position + Records.LOG_OVERHEAD >= end) - return null; + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; - logHeaderBuffer.rewind(); - channel.read(logHeaderBuffer, position); - if (logHeaderBuffer.hasRemaining()) - return null; + LogEntryPosition that = (LogEntryPosition) o; - logHeaderBuffer.rewind(); - long offset = logHeaderBuffer.getLong(); - int size = logHeaderBuffer.getInt(); - if (size < 0) - throw new IllegalStateException("Record with size " + size); + if (offset != that.offset) return false; + if (position != that.position) return false; + return size == that.size; - if (position + Records.LOG_OVERHEAD + size > end) - return null; + } - ByteBuffer recordBuffer = ByteBuffer.allocate(size); - channel.read(recordBuffer, position + Records.LOG_OVERHEAD); - if (recordBuffer.hasRemaining()) - return null; - recordBuffer.rewind(); + @Override + public int hashCode() { + int result = (int) (offset ^ (offset >>> 32)); + result = 31 * result + position; + result = 31 * result + size; + return result; + } - Record record = new Record(recordBuffer); - LogEntry logEntry = new LogEntry(offset, record); - position += logEntry.size(); - return logEntry; + @Override + public String toString() { + return "LogEntryPosition(" + + "offset=" + offset + + ", position=" + position + + ", size=" + size + + ')'; } } + + public static class TimestampAndOffset { + public final long timestamp; + public final long offset; + + public TimestampAndOffset(long timestamp, long offset) { + this.timestamp = timestamp; + this.offset = offset; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimestampAndOffset that = (TimestampAndOffset) o; + + if (timestamp != that.timestamp) return false; + return offset == that.offset; + } + + @Override + public int hashCode() { + int result = (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + (int) (offset ^ (offset >>> 32)); + return result; + } + + @Override + public String toString() { + return "TimestampAndOffset(" + + "timestamp=" + timestamp + + ", offset=" + offset + + ')'; + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java index a1009ca..ee60713 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java +++ b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java @@ -16,9 +16,9 @@ */ package org.apache.kafka.common.record; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.CorruptRecordException; -public class InvalidRecordException extends KafkaException { +public class InvalidRecordException extends CorruptRecordException { private static final long serialVersionUID = 1; http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java index 2e54b56..d2db356 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java @@ -16,33 +16,156 @@ */ package org.apache.kafka.common.record; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; + +import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; + /** * An offset and record pair */ -public final class LogEntry { +public abstract class LogEntry implements Iterable<LogEntry> { - private final long offset; - private final Record record; + /** + * Get the offset of this entry. Note that if this entry contains a compressed + * message set, then this offset will be the last offset of the nested entries + * @return the last offset contained in this entry + */ + public abstract long offset(); - public LogEntry(long offset, Record record) { - this.offset = offset; - this.record = record; + /** + * Get the shallow record for this log entry. + * @return the shallow record + */ + public abstract Record record(); + + /** + * Get the first offset of the records contained in this entry. Note that this + * generally requires deep iteration, which requires decompression, so this should + * be used with caution. + * @return The first offset contained in this entry + */ + public long firstOffset() { + return iterator().next().offset(); } - public long offset() { - return this.offset; + /** + * Get the offset following this entry (i.e. the last offset contained in this entry plus one). + * @return the next consecutive offset following this entry + */ + public long nextOffset() { + return offset() + 1; } - public Record record() { - return this.record; + /** + * Get the message format version of this entry (i.e its magic value). + * @return the magic byte + */ + public byte magic() { + return record().magic(); } @Override public String toString() { - return "LogEntry(" + offset + ", " + record + ")"; + return "LogEntry(" + offset() + ", " + record() + ")"; + } + + /** + * Get the size in bytes of this entry, including the size of the record and the log overhead. + * @return The size in bytes of this entry + */ + public int sizeInBytes() { + return record().sizeInBytes() + LOG_OVERHEAD; + } + + /** + * Check whether this entry contains a compressed message set. + * @return true if so, false otherwise + */ + public boolean isCompressed() { + return record().compressionType() != CompressionType.NONE; + } + + /** + * Write this entry into a buffer. + * @param buffer The buffer to write the entry to + */ + public void writeTo(ByteBuffer buffer) { + writeHeader(buffer, offset(), record().sizeInBytes()); + buffer.put(record().buffer().duplicate()); + } + + /** + * Get an iterator for the nested entries contained within this log entry. Note that + * if the entry is not compressed, then this method will return an iterator over the + * shallow entry only (i.e. this object). + * @return An iterator over the entries contained within this log entry + */ + @Override + public Iterator<LogEntry> iterator() { + if (isCompressed()) + return new RecordsIterator.DeepRecordsIterator(this, false, Integer.MAX_VALUE); + return Collections.singletonList(this).iterator(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || !(o instanceof LogEntry)) return false; + + LogEntry that = (LogEntry) o; + + if (offset() != that.offset()) return false; + Record thisRecord = record(); + Record thatRecord = that.record(); + return thisRecord != null ? thisRecord.equals(thatRecord) : thatRecord == null; + } + + @Override + public int hashCode() { + long offset = offset(); + Record record = record(); + int result = (int) (offset ^ (offset >>> 32)); + result = 31 * result + (record != null ? record.hashCode() : 0); + return result; + } + + public static void writeHeader(ByteBuffer buffer, long offset, int size) { + buffer.putLong(offset); + buffer.putInt(size); + } + + public static void writeHeader(DataOutputStream out, long offset, int size) throws IOException { + out.writeLong(offset); + out.writeInt(size); } - - public int size() { - return record.size() + Records.LOG_OVERHEAD; + + private static class SimpleLogEntry extends LogEntry { + private final long offset; + private final Record record; + + public SimpleLogEntry(long offset, Record record) { + this.offset = offset; + this.record = record; + } + + @Override + public long offset() { + return offset; + } + + @Override + public Record record() { + return record; + } + + } + + public static LogEntry create(long offset, Record record) { + return new SimpleLogEntry(offset, record); } + }