showuon commented on a change in pull request #11855: URL: https://github.com/apache/kafka/pull/11855#discussion_r825377329
########## File path: core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala ########## @@ -123,4 +163,37 @@ class ConsoleProducerTest { assertEquals("console-producer", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)) } + + @Test + def testBatchSizeOverride(): Unit = { + val config = new ConsoleProducer.ProducerConfig(batchSizeOverride) + val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config)) + assertEquals("456", + producerConfig.getString(ProducerConfig.BATCH_SIZE_CONFIG)) + } + + @Test + def testBatchSizeSet(): Unit = { + val config = new ConsoleProducer.ProducerConfig(batchSizeSet) + val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config)) + assertEquals("123", + producerConfig.getString(ProducerConfig.BATCH_SIZE_CONFIG)) + } + + @Test + def testDefaultBatchSize(): Unit = { + val config = new ConsoleProducer.ProducerConfig(batchSizeDefault) + val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config)) + assertEquals(16*1024, + producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG)) + } + + @Test + def testBatchSizeNotSet(): Unit = { + val config = new ConsoleProducer.ProducerConfig(batchSizeNotSet) Review comment: I think this test can be renamed to `testBatchSizeNotSetButMaxPartitionMemoryBytesSet` to be clear. And also the config variable name can be `batchSizeNotSetButMaxPartitionMemoryBytesSet` ########## File path: core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala ########## @@ -123,4 +163,37 @@ class ConsoleProducerTest { assertEquals("console-producer", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)) } + + @Test + def testBatchSizeOverride(): Unit = { + val config = new ConsoleProducer.ProducerConfig(batchSizeOverride) + val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config)) + assertEquals("456", + producerConfig.getString(ProducerConfig.BATCH_SIZE_CONFIG)) Review comment: Test failed with ``` ava.lang.ClassCastException: class java.lang.Integer cannot be cast to class java.lang.String (java.lang.Integer and java.lang.String are in module java.base of loader 'bootstrap') ``` It should be using getInt, and expected value should also be int 456, not string "456", like this: `assertEquals(456, producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))` Same comments to below tests. ########## File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala ########## @@ -110,6 +110,10 @@ object ConsoleProducer { props, ProducerConfig.SEND_BUFFER_CONFIG, config.options, config.socketBufferSizeOpt) CommandLineUtils.maybeMergeOptions( props, ProducerConfig.BUFFER_MEMORY_CONFIG, config.options, config.maxMemoryBytesOpt) + CommandLineUtils.maybeMergeOptions( Review comment: We can add a comment above this line, to mention why we set batch.size with two different option values. Ex: // We currently have 2 options to set the batch.size value. We'll deprecate/remove one of them in KIP-717. ########## File path: core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala ########## @@ -123,4 +163,37 @@ class ConsoleProducerTest { assertEquals("console-producer", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)) } + + @Test + def testBatchSizeOverride(): Unit = { + val config = new ConsoleProducer.ProducerConfig(batchSizeOverride) Review comment: Maybe rename to `testBatchSizeOverriddenByMaxPartitionMemoryBytesValue` And the config variable name.. hmm... I think `batchSizeOverride` is OK. What do you think? ########## File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala ########## @@ -138,6 +142,12 @@ object ConsoleProducer { .withOptionalArg() .describedAs("compression-codec") .ofType(classOf[String]) + val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. "+ + "please note that this opt will be replaced if max-partition-memory-bytes is also set") Review comment: nit: this opt -> this **option** ########## File path: core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala ########## @@ -123,4 +163,37 @@ class ConsoleProducerTest { assertEquals("console-producer", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)) } + + @Test + def testBatchSizeOverride(): Unit = { + val config = new ConsoleProducer.ProducerConfig(batchSizeOverride) + val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config)) + assertEquals("456", + producerConfig.getString(ProducerConfig.BATCH_SIZE_CONFIG)) + } + + @Test + def testBatchSizeSet(): Unit = { + val config = new ConsoleProducer.ProducerConfig(batchSizeSet) Review comment: renamed to `testBatchSizeSetButMaxPartitionMemoryBytesNotSet` And also the config variable name `batchSizeSet` -> `batchSizeSetButMaxPartitionMemoryBytesNotSet` ########## File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala ########## @@ -260,6 +270,7 @@ object ConsoleProducer { DefaultCompressionCodec.name else compressionCodecOptionValue else NoCompressionCodec.name + val batchSize = options.valueOf(batchSizeOpt) Review comment: This is not used. It can be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org