[ 
https://issues.apache.org/jira/browse/KAFKA-6743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499227#comment-16499227
 ] 

ASF GitHub Bot commented on KAFKA-6743:
---------------------------------------

hachikuji closed pull request #4818: KAFKA-6743 ConsumerPerformance fails to 
consume all messages 
URL: https://github.com/apache/kafka/pull/4818
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 7e0dbcbe064..64a3305d8e5 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -65,7 +65,7 @@ object ConsumerPerformance extends LazyLogging {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
       consumer.subscribe(Collections.singletonList(config.topic))
       startMs = System.currentTimeMillis
-      consume(consumer, List(config.topic), config.numMessages, 1000, config, 
totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs)
+      consume(consumer, List(config.topic), config.numMessages, 
config.recordFetchTimeoutMs, config, totalMessagesRead, totalBytesRead, 
joinGroupTimeInMs, startMs)
       endMs = System.currentTimeMillis
 
       if (config.printMetrics) {
@@ -188,6 +188,9 @@ object ConsumerPerformance extends LazyLogging {
       }
     }
 
+    if (messagesRead < count)
+      println(s"WARNING: Exiting before consuming the expected number of 
messages: timeout ($timeout ms) exceeded. " +
+        "You can use the --timeout option to increase the timeout."))
     totalMessagesRead.set(messagesRead)
     totalBytesRead.set(bytesRead)
   }
@@ -302,6 +305,11 @@ object ConsumerPerformance extends LazyLogging {
     val printMetricsOpt = parser.accepts("print-metrics", "Print out the 
metrics. This only applies to new consumer.")
     val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, 
stats are reported for each reporting " +
       "interval as configured by reporting-interval")
+    val recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed 
time in milliseconds between returned records.")
+      .withOptionalArg()
+      .describedAs("milliseconds")
+      .ofType(classOf[Long])
+      .defaultsTo(10000)
 
     val options = parser.parse(args: _*)
 
@@ -354,6 +362,7 @@ object ConsumerPerformance extends LazyLogging {
     val showDetailedStats = options.has(showDetailedStatsOpt)
     val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
     val hideHeader = options.has(hideHeaderOpt)
+    val recordFetchTimeoutMs = 
options.valueOf(recordFetchTimeoutOpt).longValue()
   }
 
   class ConsumerPerfThread(threadId: Int,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConsumerPerformance fails to consume all messages on topics with large number 
> of partitions
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6743
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6743
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, tools
>    Affects Versions: 0.11.0.2
>            Reporter: Alex Dunayevsky
>            Priority: Minor
>
> ConsumerPerformance fails to consume all messages on topics with large number 
> of partitions due to a relatively short default polling loop timeout (1000 
> ms) that is not reachable and modifiable by the end user.
> Demo: Create a topic of 10 000 partitions, send a 50 000 000 of 100 byte 
> records using kafka-producer-perf-test and consume them using 
> kafka-consumer-perf-test (ConsumerPerformance). You will likely notice that 
> the number of records returned by the kafka-consumer-perf-test is many times 
> less than expected 50 000 000. This happens due to specific 
> ConsumerPerformance implementation. As the result, in some rough cases it may 
> take a long enough time to process/iterate through the records polled in 
> batches, thus, the time may exceed the default hardcoded polling loop timeout 
> and this is probably not what we want from this utility.
> We have two options: 
> 1) Increasing polling loop timeout in ConsumerPerformance implementation. It 
> defaults to 1000 ms and is hardcoded, thus cannot be changed but we could 
> export it as an OPTIONAL kafka-consumer-perf-test parameter to enable it on a 
> script level configuration and available to the end user.
> 2) Decreasing max.poll.records on a Consumer config level. This is not a fine 
> option though since we do not want to touch the default settings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to