[ 
https://issues.apache.org/jira/browse/KAFKA-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377427#comment-16377427
 ] 

ASF GitHub Bot commented on KAFKA-5327:
---------------------------------------

hachikuji closed pull request #4546: KAFKA-5327: Console Consumer should only 
poll for up to max messages
URL: https://github.com/apache/kafka/pull/4546
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala 
b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 04ac2d9f1a5..9e49fe4dc8b 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -23,6 +23,7 @@ import java.util.regex.Pattern
 import kafka.api.OffsetRequest
 import kafka.common.StreamEndException
 import kafka.message.Message
+import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.header.Headers
@@ -55,10 +56,8 @@ case class BaseConsumerRecord(topic: String,
 
 @deprecated("This class has been deprecated and will be removed in a future 
release. " +
             "Please use org.apache.kafka.clients.consumer.KafkaConsumer 
instead.", "0.11.0.0")
-class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], 
offset: Option[Long], whitelist: Option[String], consumerProps: Properties, val 
timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
-  import org.apache.kafka.clients.consumer.KafkaConsumer
-
-  val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
+class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], 
offset: Option[Long], whitelist: Option[String],
+                       consumer: Consumer[Array[Byte], Array[Byte]], val 
timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
   consumerInit()
   var recordIter = consumer.poll(0).iterator
 
@@ -91,6 +90,17 @@ class NewShinyConsumer(topic: Option[String], partitionId: 
Option[Int], offset:
     }
   }
 
+  def resetUnconsumedOffsets() {
+    val smallestUnconsumedOffsets = collection.mutable.Map[TopicPartition, 
Long]()
+    while (recordIter.hasNext) {
+      val record = recordIter.next()
+      val tp = new TopicPartition(record.topic, record.partition)
+      // avoid auto-committing offsets which haven't been consumed
+      smallestUnconsumedOffsets.getOrElseUpdate(tp, record.offset)
+    }
+    smallestUnconsumedOffsets.foreach { case (tp, offset) => consumer.seek(tp, 
offset) }
+  }
+
   override def receive(): BaseConsumerRecord = {
     if (!recordIter.hasNext) {
       recordIter = consumer.poll(timeoutMs).iterator
@@ -114,6 +124,7 @@ class NewShinyConsumer(topic: Option[String], partitionId: 
Option[Int], offset:
   }
 
   override def cleanup() {
+    resetUnconsumedOffsets()
     this.consumer.close()
   }
 
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 7d2d3710157..24fa583f1bb 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -31,10 +31,10 @@ import kafka.message._
 import kafka.metrics.KafkaMetricsReporter
 import kafka.utils._
 import kafka.utils.Implicits._
-import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
 import org.apache.kafka.common.errors.{AuthenticationException, 
WakeupException}
 import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
Deserializer}
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -72,10 +72,11 @@ object ConsoleConsumer extends Logging {
         new OldConsumer(conf.filterSpec, props)
       } else {
         val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else 
Long.MaxValue
+        val consumer = new KafkaConsumer(getNewConsumerProps(conf), new 
ByteArrayDeserializer, new ByteArrayDeserializer)
         if (conf.partitionArg.isDefined)
-          new NewShinyConsumer(Option(conf.topicArg), conf.partitionArg, 
Option(conf.offsetArg), None, getNewConsumerProps(conf), timeoutMs)
+          new NewShinyConsumer(Option(conf.topicArg), conf.partitionArg, 
Option(conf.offsetArg), None, consumer, timeoutMs)
         else
-          new NewShinyConsumer(Option(conf.topicArg), None, None, 
Option(conf.whitelistArg), getNewConsumerProps(conf), timeoutMs)
+          new NewShinyConsumer(Option(conf.topicArg), None, None, 
Option(conf.whitelistArg), consumer, timeoutMs)
       }
 
     addShutdownHook(consumer, conf)
@@ -202,15 +203,12 @@ object ConsoleConsumer extends Logging {
     }
   }
 
-  def getNewConsumerProps(config: ConsumerConfig): Properties = {
+  private[tools] def getNewConsumerProps(config: ConsumerConfig): Properties = 
{
     val props = new Properties
-
     props ++= config.consumerProps
     props ++= config.extraConsumerProps
     setAutoOffsetResetValue(config, props)
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
     props
   }
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 364f6b3b93e..9ae8b966ac0 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -17,18 +17,65 @@
 
 package kafka.tools
 
-import java.io.{PrintStream, FileOutputStream}
+import java.io.{FileOutputStream, PrintStream}
 
 import kafka.common.MessageFormatter
-import kafka.consumer.{BaseConsumer, BaseConsumerRecord}
+import kafka.consumer.{BaseConsumer, BaseConsumerRecord, NewShinyConsumer}
 import kafka.utils.{Exit, TestUtils}
+import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, 
OffsetResetStrategy}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.easymock.EasyMock
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
 
 class ConsoleConsumerTest {
 
+  @Before
+  def setup(): Unit = {
+    ConsoleConsumer.messageCount = 0
+  }
+
+  @Test
+  def shouldResetUnConsumedOffsetsBeforeExitForNewConsumer() {
+    val topic = "test"
+    val maxMessages: Int = 123
+    val totalMessages: Int = 700
+    val startOffset: java.lang.Long = 0L
+
+    val mockConsumer = new MockConsumer[Array[Byte], 
Array[Byte]](OffsetResetStrategy.EARLIEST)
+    val tp1 = new TopicPartition(topic, 0)
+    val tp2 = new TopicPartition(topic, 1)
+
+    val consumer = new NewShinyConsumer(Some(topic), None, None, None, 
mockConsumer)
+
+    mockConsumer.rebalance(List(tp1, tp2).asJava)
+    mockConsumer.updateBeginningOffsets(Map(tp1 -> startOffset, tp2 -> 
startOffset).asJava)
+
+    0 until totalMessages foreach { i =>
+      // add all records, each partition should have half of `totalMessages`
+      mockConsumer.addRecord(new ConsumerRecord[Array[Byte], 
Array[Byte]](topic, i % 2, i / 2, "key".getBytes, "value".getBytes))
+    }
+
+    // Mocks
+    val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])
+
+    // Expectations
+    EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), 
EasyMock.anyObject())).times(maxMessages)
+    EasyMock.replay(formatter)
+
+    // Test
+    ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, 
skipMessageOnError = false)
+    assertEquals(totalMessages, mockConsumer.position(tp1) + 
mockConsumer.position(tp2))
+
+    consumer.resetUnconsumedOffsets()
+    assertEquals(maxMessages, mockConsumer.position(tp1) + 
mockConsumer.position(tp2))
+
+    EasyMock.verify(formatter)
+  }
+
   @Test
   def shouldLimitReadsToMaxMessageLimit() {
     //Mocks


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Console Consumer should only poll for up to max messages
> --------------------------------------------------------
>
>                 Key: KAFKA-5327
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5327
>             Project: Kafka
>          Issue Type: Improvement
>          Components: tools
>            Reporter: Dustin Cote
>            Assignee: huxihx
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> The ConsoleConsumer has a --max-messages flag that can be used to limit the 
> number of messages consumed. However, the number of records actually consumed 
> is governed by max.poll.records. This means you see one message on the 
> console, but your offset has moved forward a default of 500, which is kind of 
> counterintuitive. It would be good to only commit offsets for messages we 
> have printed to the console.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to