Repository: kafka Updated Branches: refs/heads/trunk 88aec3eb6 -> 5d7c8cc81
KAFKA-5414; Revert KAFKA-5327 which changed ConsoleConsumer offset commit behavior This reverts commit d7d1196a0b542adb46d22eeb5b6d12af950b64c9. Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3277 from hachikuji/KAFKA-5414 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d7c8cc8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d7c8cc8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d7c8cc8 Branch: refs/heads/trunk Commit: 5d7c8cc81af6cd2f209f8e5f3df784b9272830b9 Parents: 88aec3e Author: Jason Gustafson <ja...@confluent.io> Authored: Thu Jun 8 18:35:07 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Jun 8 18:35:12 2017 -0700 ---------------------------------------------------------------------- .../scala/kafka/consumer/BaseConsumer.scala | 46 ++++++-------------- .../scala/kafka/tools/ConsoleConsumer.scala | 3 -- 2 files changed, 13 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5d7c8cc8/core/src/main/scala/kafka/consumer/BaseConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala index fd5aa41..cec74d0 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -17,22 +17,18 @@ package kafka.consumer -import java.util import java.util.{Collections, Properties} import java.util.regex.Pattern import kafka.api.OffsetRequest import kafka.common.StreamEndException import kafka.message.Message -import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata} import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.header.Headers import org.apache.kafka.common.header.internals.RecordHeaders -import scala.collection.mutable.HashMap - /** * A base consumer used to abstract both old and new consumer * this class should be removed (along with BaseProducer) @@ -64,13 +60,8 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: import org.apache.kafka.clients.consumer.KafkaConsumer val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) - val offsets = new HashMap[TopicPartition, Long]() - consumerInit() - private var currentPartition: TopicPartition = null - private var polledRecords = consumer.poll(0) - private var partitionIter = polledRecords.partitions.iterator - private var recordIter: util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null + var recordIter = consumer.poll(0).iterator def consumerInit() { (topic, partitionId, offset, whitelist) match { @@ -102,30 +93,21 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: } override def receive(): BaseConsumerRecord = { - if (recordIter == null || !recordIter.hasNext) { - if (!partitionIter.hasNext) { - polledRecords = consumer.poll(timeoutMs) - partitionIter = polledRecords.partitions.iterator - - if (!partitionIter.hasNext) - throw new ConsumerTimeoutException - } - - currentPartition = partitionIter.next - recordIter = polledRecords.records(currentPartition).iterator + if (!recordIter.hasNext) { + recordIter = consumer.poll(timeoutMs).iterator + if (!recordIter.hasNext) + throw new ConsumerTimeoutException } val record = recordIter.next - offsets.put(currentPartition, record.offset + 1) - BaseConsumerRecord(record.topic, - record.partition, - record.offset, - record.timestamp, - record.timestampType, - record.key, - record.value, - record.headers) + record.partition, + record.offset, + record.timestamp, + record.timestampType, + record.key, + record.value, + record.headers) } override def stop() { @@ -137,9 +119,7 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: } override def commit() { - import scala.collection.JavaConverters._ - consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset))}.asJava) - offsets.clear() + this.consumer.commitSync() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d7c8cc8/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index a1e2ffa..335c724 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -77,7 +77,6 @@ object ConsoleConsumer extends Logging { try { process(conf.maxMessages, conf.formatter, consumer, System.out, conf.skipMessageOnError) } finally { - consumer.commit() consumer.cleanup() conf.formatter.close() reportRecordCount() @@ -201,9 +200,7 @@ object ConsoleConsumer extends Logging { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel) - props }