Repository: kafka Updated Branches: refs/heads/trunk 7802a90ed -> db8d6f02c
KAFKA-3179; Fix seek on compressed messages The fix itself is simple. Some explanation on unit tests. Currently we the vast majority of unit test is running with uncompressed messages. I was initially thinking about run all the tests using compressed messages. But it seems uncompressed messages are necessary in a many test cases because we need the bytes sent and appended to the log to be predictable. In most of other cases, it does not matter whether the message is compressed or not, and compression will slow down the unit test. So I just added one method in the BaseConsumerTest to send compressed messages whenever we need it. Author: Jiangjie Qin <becket....@gmail.com> Reviewers: Aditya Auradkar <aaurad...@linkedin.com>, Ismael Juma <ism...@juma.me.uk>, Joel Koshy <jjkosh...@gmail.com> Closes #842 from becketqin/KAFKA-3179 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/db8d6f02 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/db8d6f02 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/db8d6f02 Branch: refs/heads/trunk Commit: db8d6f02c092c42f2402b7e2587c1b28d330bf83 Parents: 7802a90 Author: Jiangjie Qin <becket....@gmail.com> Authored: Thu Feb 4 16:08:21 2016 -0800 Committer: Joel Koshy <jjko...@gmail.com> Committed: Thu Feb 4 16:08:21 2016 -0800 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 7 +++- .../clients/consumer/internals/FetcherTest.java | 10 ++++- .../kafka/api/BaseConsumerTest.scala | 31 ++++++++++------ .../kafka/api/PlaintextConsumerTest.scala | 39 ++++++++++++++++++-- 4 files changed, 69 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/db8d6f02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index c06e899..e8f1f55 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -559,8 +559,11 @@ public class Fetcher<K, V> { MemoryRecords records = MemoryRecords.readableRecords(buffer); List<ConsumerRecord<K, V>> parsed = new ArrayList<>(); for (LogEntry logEntry : records) { - parsed.add(parseRecord(tp, logEntry)); - bytes += logEntry.size(); + // Skip the messages earlier than current position. + if (logEntry.offset() >= position) { + parsed.add(parseRecord(tp, logEntry)); + bytes += logEntry.size(); + } } if (!parsed.isEmpty()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/db8d6f02/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 79e47c0..5e750fd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -464,8 +464,16 @@ public class FetcherTest { // normal fetch for (int i = 1; i < 4; i++) { + // We need to make sure the message offset grows. Otherwise they will be considered as already consumed + // and filtered out by consumer. + if (i > 1) { + this.records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); + for (int v = 0; v < 3; v++) { + this.records.append((long) i * 3 + v, "key".getBytes(), String.format("value-%d", v).getBytes()); + } + this.records.close(); + } fetcher.initFetches(cluster); - client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i)); consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp); http://git-wip-us.apache.org/repos/asf/kafka/blob/db8d6f02/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 819e690..eb24706 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -15,7 +15,7 @@ package kafka.api import java.util import org.apache.kafka.clients.consumer._ -import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.{PartitionInfo, TopicPartition} @@ -73,7 +73,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { assertEquals(0, this.consumers(0).assignment.size) this.consumers(0).assign(List(tp).asJava) assertEquals(1, this.consumers(0).assignment.size) - + this.consumers(0).seek(tp, 0) consumeAndVerifyRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0) @@ -143,20 +143,20 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { this.consumers(0).poll(50) val pos1 = this.consumers(0).position(tp) val pos2 = this.consumers(0).position(tp2) - this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava) + this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava) assertEquals(3, this.consumers(0).committed(tp).offset) assertNull(this.consumers(0).committed(tp2)) // positions should not change assertEquals(pos1, this.consumers(0).position(tp)) assertEquals(pos2, this.consumers(0).position(tp2)) - this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava) + this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava) assertEquals(3, this.consumers(0).committed(tp).offset) assertEquals(5, this.consumers(0).committed(tp2).offset) // Using async should pick up the committed changes after commit completes val commitCallback = new CountConsumerCommitCallback() - this.consumers(0).commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback) + this.consumers(0).commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback) awaitCommitCallback(this.consumers(0), commitCallback) assertEquals(7, this.consumers(0).committed(tp2).offset) } @@ -259,10 +259,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { protected class TestConsumerReassignmentListener extends ConsumerRebalanceListener { var callsToAssigned = 0 var callsToRevoked = 0 + def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) { info("onPartitionsAssigned called.") callsToAssigned += 1 } + def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]) { info("onPartitionsRevoked called.") callsToRevoked += 1 @@ -274,13 +276,20 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { } protected def sendRecords(numRecords: Int, tp: TopicPartition) { - (0 until numRecords).map { i => - this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes, s"value $i".getBytes)) - }.foreach(_.get) + sendRecords(this.producers(0), numRecords, tp) + } + + protected def sendRecords(producer: Producer[Array[Byte], Array[Byte]], + numRecords: Int, + tp: TopicPartition) { + (0 until numRecords).foreach { i => + producer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes, s"value $i".getBytes)) + } + producer.flush() } protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int, - startingKeyAndValueIndex: Int = 0) { + startingKeyAndValueIndex: Int = 0, tp: TopicPartition = tp) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 300 var iters = 0 @@ -294,8 +303,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { for (i <- 0 until numRecords) { val record = records.get(i) val offset = startingOffset + i - assertEquals(topic, record.topic()) - assertEquals(part, record.partition()) + assertEquals(tp.topic(), record.topic()) + assertEquals(tp.partition(), record.partition()) assertEquals(offset.toLong, record.offset()) val keyAndValueIndex = startingKeyAndValueIndex + i assertEquals(s"key $keyAndValueIndex", new String(record.key())) http://git-wip-us.apache.org/repos/asf/kafka/blob/db8d6f02/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 47b5d8f..6711edf 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -12,13 +12,15 @@ */ package kafka.api +import java.util.Properties import java.util.regex.Pattern import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.clients.consumer._ -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.record.CompressionType import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.errors.{InvalidTopicException, RecordTooLargeException} import org.junit.Assert._ @@ -266,7 +268,10 @@ class PlaintextConsumerTest extends BaseConsumerTest { def testSeek() { val consumer = this.consumers(0) val totalRecords = 50L - sendRecords(totalRecords.toInt) + val mid = totalRecords / 2 + + // Test seek non-compressed message + sendRecords(totalRecords.toInt, tp) consumer.assign(List(tp).asJava) consumer.seekToEnd(tp) @@ -277,10 +282,36 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(0, consumer.position(tp), 0) consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0) - val mid = totalRecords / 2 consumer.seek(tp, mid) assertEquals(mid, consumer.position(tp)) consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt) + + // Test seek compressed message + sendCompressedMessages(totalRecords.toInt, tp2) + consumer.assign(List(tp2).asJava) + + consumer.seekToEnd(tp2) + assertEquals(totalRecords, consumer.position(tp2)) + assertFalse(consumer.poll(totalRecords).iterator().hasNext) + + consumer.seekToBeginning(tp2) + assertEquals(0, consumer.position(tp2), 0) + consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = tp2) + + consumer.seek(tp2, mid) + assertEquals(mid, consumer.position(tp2)) + consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt, + tp = tp2) + } + + private def sendCompressedMessages(numRecords: Int, tp: TopicPartition) { + val producerProps = new Properties() + producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name) + producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Long.MaxValue.toString) + val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, + retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps)) + sendRecords(producer, numRecords, tp) + producer.close() } def testPositionAndCommit() {