Repository: kafka Updated Branches: refs/heads/trunk edb372dca -> a0b8e435c
MINOR: Support streaming decompression of fetched records for new format Author: Jason Gustafson <ja...@confluent.io> Reviewers: Apurva Mehta <apu...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #2738 from hachikuji/streaming-compressed-iterator Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a0b8e435 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a0b8e435 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a0b8e435 Branch: refs/heads/trunk Commit: a0b8e435c9419a9402d08408260bea0c1d95cff0 Parents: edb372d Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Mar 28 17:47:10 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Mar 28 17:47:10 2017 -0700 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 222 ++++++++++--------- .../kafka/common/record/DefaultRecordBatch.java | 100 +++++---- .../clients/consumer/internals/FetcherTest.java | 6 +- 3 files changed, 175 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a0b8e435/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index c2456cc..ad63d25 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -66,6 +66,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; @@ -97,7 +98,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; - private PartitionRecords<K, V> nextInLineRecords = null; + private PartitionRecords nextInLineRecords = null; public Fetcher(ConsumerNetworkClient client, int minBytes, @@ -464,11 +465,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { * the defaultResetPolicy is NONE */ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { - Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>(); + Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>(); int recordsRemaining = maxPollRecords; while (recordsRemaining > 0) { - if (nextInLineRecords == null || nextInLineRecords.isDrained()) { + if (nextInLineRecords == null || nextInLineRecords.isFetched) { CompletedFetch completedFetch = completedFetches.poll(); if (completedFetch == null) break; @@ -476,11 +477,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { nextInLineRecords = parseCompletedFetch(completedFetch); } else { TopicPartition partition = nextInLineRecords.partition; - List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining); + List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); if (!records.isEmpty()) { - List<ConsumerRecord<K, V>> currentRecords = drained.get(partition); + List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); if (currentRecords == null) { - drained.put(partition, records); + fetched.put(partition, records); } else { // this case shouldn't usually happen because we only send one fetch at a time per partition, // but it might conceivably happen in some rare cases (such as partition leader changes). @@ -488,35 +489,35 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); newRecords.addAll(currentRecords); newRecords.addAll(records); - drained.put(partition, newRecords); + fetched.put(partition, newRecords); } recordsRemaining -= records.size(); } } } - return drained; + return fetched; } - private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) { + private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) { if (!subscriptions.isAssigned(partitionRecords.partition)) { // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call - log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition); + log.debug("Not returning fetched records for partition {} since it is no longer assigned", + partitionRecords.partition); } else { // note that the consumed position should always be available as long as the partition is still assigned long position = subscriptions.position(partitionRecords.partition); if (!subscriptions.isFetchable(partitionRecords.partition)) { // this can happen when a partition is paused before fetched records are returned to the consumer's poll call - log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition); - } else if (partitionRecords.fetchOffset == position) { - List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords); - if (!partRecords.isEmpty()) { - long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; - log.trace("Returning fetched records at offset {} for assigned partition {} and update " + - "position to {}", position, partitionRecords.partition, nextOffset); - - subscriptions.position(partitionRecords.partition, nextOffset); - } + log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", + partitionRecords.partition); + } else if (partitionRecords.nextFetchOffset == position) { + List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords); + + long nextOffset = partitionRecords.nextFetchOffset; + log.trace("Returning fetched records at offset {} for assigned partition {} and update " + + "position to {}", position, partitionRecords.partition, nextOffset); + subscriptions.position(partitionRecords.partition, nextOffset); Long partitionLag = subscriptions.partitionLag(partitionRecords.partition); if (partitionLag != null) @@ -527,7 +528,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { // these records aren't next in line based on the last consumed position, ignore them // they must be from an obsolete request log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", - partitionRecords.partition, partitionRecords.fetchOffset, position); + partitionRecords.partition, partitionRecords.nextFetchOffset, position); } } @@ -691,7 +692,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { private List<TopicPartition> fetchablePartitions() { Set<TopicPartition> exclude = new HashSet<>(); List<TopicPartition> fetchable = subscriptions.fetchablePartitions(); - if (nextInLineRecords != null && !nextInLineRecords.isDrained()) { + if (nextInLineRecords != null && !nextInLineRecords.isFetched) { exclude.add(nextInLineRecords.partition); } for (CompletedFetch completedFetch : completedFetches) { @@ -743,13 +744,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { /** * The callback for fetch completion */ - private PartitionRecords<K, V> parseCompletedFetch(CompletedFetch completedFetch) { + private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) { TopicPartition tp = completedFetch.partition; FetchResponse.PartitionData partition = completedFetch.partitionData; long fetchOffset = completedFetch.fetchedOffset; - int bytes = 0; - int recordsCount = 0; - PartitionRecords<K, V> parsedRecords = null; + PartitionRecords parsedRecords = null; Errors error = partition.error; try { @@ -767,36 +766,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { return null; } - List<ConsumerRecord<K, V>> parsed = new ArrayList<>(); - boolean skippedRecords = false; - for (RecordBatch batch : partition.records.batches()) { - if (this.checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) { - try { - batch.ensureValid(); - } catch (InvalidRecordException e) { - throw new KafkaException("Record batch for partition " + partition + " at offset " + - batch.baseOffset() + " is invalid, cause: " + e.getMessage()); - } - } - - for (Record record : batch) { - // control records should not be returned to the user. also skip anything out of range - if (record.isControlRecord() || record.offset() < position) { - skippedRecords = true; - } else { - parsed.add(parseRecord(tp, batch, record)); - bytes += record.sizeInBytes(); - } - } - } - - recordsCount = parsed.size(); - - log.trace("Adding {} fetched record(s) for partition {} with offset {} to buffered record list", - parsed.size(), tp, position); - parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed); + log.trace("Preparing to read {} bytes of data for partition {} with offset {}", + partition.records.sizeInBytes(), tp, position); + Iterator<? extends RecordBatch> batches = partition.records.batches().iterator(); + parsedRecords = new PartitionRecords(tp, completedFetch, batches); - if (parsed.isEmpty() && !skippedRecords && partition.records.sizeInBytes() > 0) { + if (!batches.hasNext() && partition.records.sizeInBytes() > 0) { if (completedFetch.responseVersion < 3) { // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException. Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset); @@ -815,7 +790,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { } if (partition.highWatermark >= 0) { - log.trace("Received {} records in fetch response for partition {} with offset {}", parsed.size(), tp, position); + log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark); subscriptions.updateHighWatermark(tp, partition.highWatermark); } } else if (error == Errors.NOT_LEADER_FOR_PARTITION) { @@ -844,29 +819,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data"); } } finally { - completedFetch.metricAggregator.record(tp, bytes, recordsCount); + if (error != Errors.NONE) { + completedFetch.metricAggregator.record(tp, 0, 0); + // we move the partition to the end if there was an error. This way, it's more likely that partitions for + // the same topic can remain together (allowing for more efficient serialization). + subscriptions.movePartitionToEnd(tp); + } } - // we move the partition to the end if we received some bytes or if there was an error. This way, it's more - // likely that partitions for the same topic can remain together (allowing for more efficient serialization). - if (bytes > 0 || error != Errors.NONE) - subscriptions.movePartitionToEnd(tp); - return parsedRecords; } - private ConsumerRecord<K, V> parseRecord(TopicPartition partition, - RecordBatch batch, - Record record) { - if (this.checkCrcs) { - try { - record.ensureValid(); - } catch (InvalidRecordException e) { - throw new KafkaException("Record for partition " + partition + " at offset " + record.offset() - + " is invalid, cause: " + e.getMessage()); - } - } - + private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) { try { long offset = record.offset(); long timestamp = record.timestamp(); @@ -894,42 +858,102 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { sensors.updatePartitionLagSensors(assignment); } - private static class PartitionRecords<K, V> { - private long fetchOffset; - private TopicPartition partition; - private List<ConsumerRecord<K, V>> records; - private int position = 0; - - private PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) { - this.fetchOffset = fetchOffset; + private class PartitionRecords { + private final TopicPartition partition; + private final CompletedFetch completedFetch; + private final Iterator<? extends RecordBatch> batches; + + private int recordsRead; + private int bytesRead; + private RecordBatch currentBatch; + private Iterator<Record> records; + private long nextFetchOffset; + private boolean isFetched = false; + + private PartitionRecords(TopicPartition partition, + CompletedFetch completedFetch, + Iterator<? extends RecordBatch> batches) { this.partition = partition; - this.records = records; + this.completedFetch = completedFetch; + this.batches = batches; + this.nextFetchOffset = completedFetch.fetchedOffset; } - private boolean isDrained() { - return records == null; + private void drain() { + if (!isFetched) { + this.isFetched = true; + this.completedFetch.metricAggregator.record(partition, bytesRead, recordsRead); + + // we move the partition to the end if we received some bytes. This way, it's more likely that partitions + // for the same topic can remain together (allowing for more efficient serialization). + if (bytesRead > 0) + subscriptions.movePartitionToEnd(partition); + } } - private void drain() { - this.records = null; + private void maybeEnsureValid(RecordBatch batch) { + if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) { + try { + batch.ensureValid(); + } catch (InvalidRecordException e) { + throw new KafkaException("Record batch for partition " + partition + " at offset " + + batch.baseOffset() + " is invalid, cause: " + e.getMessage()); + } + } } - private List<ConsumerRecord<K, V>> drainRecords(int n) { - if (isDrained() || position >= records.size()) { - drain(); - return Collections.emptyList(); + private void maybeEnsureValid(Record record) { + if (checkCrcs) { + try { + record.ensureValid(); + } catch (InvalidRecordException e) { + throw new KafkaException("Record for partition " + partition + " at offset " + record.offset() + + " is invalid, cause: " + e.getMessage()); + } + } + } + + private Record nextFetchedRecord() { + while (true) { + if (records == null || !records.hasNext()) { + if (!batches.hasNext()) { + drain(); + return null; + } + currentBatch = batches.next(); + maybeEnsureValid(currentBatch); + records = currentBatch.iterator(); + } + + Record record = records.next(); + maybeEnsureValid(record); + + // skip any records out of range + if (record.offset() >= nextFetchOffset) { + nextFetchOffset = record.offset() + 1; + + // control records are not returned to the user + if (!record.isControlRecord()) + return record; + } } + } - // using a sublist avoids a potentially expensive list copy (depending on the size of the records - // and the maximum we can return from poll). The cost is that we cannot mutate the returned sublist. - int limit = Math.min(records.size(), position + n); - List<ConsumerRecord<K, V>> res = Collections.unmodifiableList(records.subList(position, limit)); + private List<ConsumerRecord<K, V>> fetchRecords(int n) { + if (isFetched) + return Collections.emptyList(); - position = limit; - if (position < records.size()) - fetchOffset = records.get(position).offset(); + List<ConsumerRecord<K, V>> records = new ArrayList<>(); + for (int i = 0; i < n; i++) { + Record record = nextFetchedRecord(); + if (record == null) + break; - return res; + recordsRead++; + bytesRead += record.sizeInBytes(); + records.add(parseRecord(partition, currentBatch, record)); + } + return records; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a0b8e435/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 1de568e..8f50a2b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -20,14 +20,11 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.Crc32; -import org.apache.kafka.common.utils.Utils; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; @@ -205,62 +202,29 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe private Iterator<Record> compressedIterator() { ByteBuffer buffer = this.buffer.duplicate(); buffer.position(RECORDS_OFFSET); - DataInputStream stream = new DataInputStream(compressionType().wrapForInput( + final DataInputStream stream = new DataInputStream(compressionType().wrapForInput( new ByteBufferInputStream(buffer), magic())); - // TODO: An improvement for the consumer would be to only decompress the records - // we need to fill max.poll.records and leave the rest compressed. - int numRecords = count(); - if (numRecords < 0) - throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" + - magic() + " batch"); - - List<Record> records = new ArrayList<>(numRecords); - try { - Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null; - long baseOffset = baseOffset(); - long baseTimestamp = baseTimestamp(); - int baseSequence = baseSequence(); - - for (int i = 0; i < numRecords; i++) - records.add(DefaultRecord.readFrom(stream, baseOffset, baseTimestamp, baseSequence, logAppendTime)); - } catch (IOException e) { - throw new KafkaException(e); - } finally { - Utils.closeQuietly(stream, "records iterator stream"); - } - - return records.iterator(); + return new RecordIterator() { + @Override + protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { + try { + return DefaultRecord.readFrom(stream, baseOffset, baseTimestamp, baseSequence, logAppendTime); + } catch (IOException e) { + throw new KafkaException("Failed to decompress record stream", e); + } + } + }; } private Iterator<Record> uncompressedIterator() { final ByteBuffer buffer = this.buffer.duplicate(); - final Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null; - final long baseOffset = baseOffset(); - final long baseTimestamp = baseTimestamp(); - final int baseSequence = baseSequence(); - buffer.position(RECORDS_OFFSET); - final int totalRecords = count(); - - return new Iterator<Record>() { - int readRecords = 0; - - @Override - public boolean hasNext() { - return readRecords < totalRecords; - } - + return new RecordIterator() { @Override - public Record next() { - readRecords++; + protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime); } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } }; } @@ -432,4 +396,42 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers); } + private abstract class RecordIterator implements Iterator<Record> { + private final Long logAppendTime; + private final long baseOffset; + private final long baseTimestamp; + private final int baseSequence; + private final int numRecords; + private int readRecords = 0; + + public RecordIterator() { + this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null; + this.baseOffset = baseOffset(); + this.baseTimestamp = baseTimestamp(); + this.baseSequence = baseSequence(); + int numRecords = count(); + if (numRecords < 0) + throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" + + magic() + " batch"); + this.numRecords = numRecords; + } + + @Override + public boolean hasNext() { + return readRecords < numRecords; + } + + @Override + public Record next() { + readRecords++; + return readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime); + } + + protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime); + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a0b8e435/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 224f83c..b03b461 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -191,11 +191,7 @@ public class FetcherTest { List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp); assertEquals(2, records.size()); - - // TODO: currently the offset does not advance beyond the control record until a record - // with a larger offset is fetched. In the worst case, we may fetch the control record - // again after a rebalance, but that should be fine since we just discard it anyway - assertEquals(3L, subscriptions.position(tp).longValue()); + assertEquals(4L, subscriptions.position(tp).longValue()); for (ConsumerRecord<byte[], byte[]> record : records) assertArrayEquals("key".getBytes(), record.key()); }