showuon commented on a change in pull request #11855:
URL: https://github.com/apache/kafka/pull/11855#discussion_r825377329



##########
File path: core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
##########
@@ -123,4 +163,37 @@ class ConsoleProducerTest {
     assertEquals("console-producer",
       producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
   }
+
+  @Test
+  def testBatchSizeOverride(): Unit = {
+    val config = new ConsoleProducer.ProducerConfig(batchSizeOverride)
+    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
+    assertEquals("456",
+      producerConfig.getString(ProducerConfig.BATCH_SIZE_CONFIG))
+  }
+
+  @Test
+  def testBatchSizeSet(): Unit = {
+    val config = new ConsoleProducer.ProducerConfig(batchSizeSet)
+    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
+    assertEquals("123",
+      producerConfig.getString(ProducerConfig.BATCH_SIZE_CONFIG))
+  }
+
+  @Test
+  def testDefaultBatchSize(): Unit = {
+    val config = new ConsoleProducer.ProducerConfig(batchSizeDefault)
+    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
+    assertEquals(16*1024,
+      producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))
+  }
+
+  @Test
+  def testBatchSizeNotSet(): Unit = {
+    val config = new ConsoleProducer.ProducerConfig(batchSizeNotSet)

Review comment:
       I think this test can be renamed to 
`testBatchSizeNotSetButMaxPartitionMemoryBytesSet` to be clear.
   And also the config variable name can be 
`batchSizeNotSetButMaxPartitionMemoryBytesSet`

##########
File path: core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
##########
@@ -123,4 +163,37 @@ class ConsoleProducerTest {
     assertEquals("console-producer",
       producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
   }
+
+  @Test
+  def testBatchSizeOverride(): Unit = {
+    val config = new ConsoleProducer.ProducerConfig(batchSizeOverride)
+    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
+    assertEquals("456",
+      producerConfig.getString(ProducerConfig.BATCH_SIZE_CONFIG))

Review comment:
       Test failed with 
   ```
   ava.lang.ClassCastException: class java.lang.Integer cannot be cast to class 
java.lang.String (java.lang.Integer and java.lang.String are in module 
java.base of loader 'bootstrap')
   ```
   It should be using getInt, and expected value should also be int 456, not 
string "456", like this:
   `assertEquals(456, producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))`
   
   Same comments to below tests.

##########
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##########
@@ -110,6 +110,10 @@ object ConsoleProducer {
       props, ProducerConfig.SEND_BUFFER_CONFIG, config.options, 
config.socketBufferSizeOpt)
     CommandLineUtils.maybeMergeOptions(
       props, ProducerConfig.BUFFER_MEMORY_CONFIG, config.options, 
config.maxMemoryBytesOpt)
+    CommandLineUtils.maybeMergeOptions(

Review comment:
       We can add a comment above this line, to mention why we set batch.size 
with two different option values. Ex:
   // We currently have 2 options to set the batch.size value. We'll 
deprecate/remove one of them in KIP-717.

##########
File path: core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
##########
@@ -123,4 +163,37 @@ class ConsoleProducerTest {
     assertEquals("console-producer",
       producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
   }
+
+  @Test
+  def testBatchSizeOverride(): Unit = {
+    val config = new ConsoleProducer.ProducerConfig(batchSizeOverride)

Review comment:
       Maybe rename to `testBatchSizeOverriddenByMaxPartitionMemoryBytesValue`
   And the config variable name.. hmm... I think `batchSizeOverride` is OK. 
What do you think?

##########
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##########
@@ -138,6 +142,12 @@ object ConsoleProducer {
                                     .withOptionalArg()
                                     .describedAs("compression-codec")
                                     .ofType(classOf[String])
+    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to 
send in a single batch if they are not being sent synchronously. "+
+       "please note that this opt will be replaced if 
max-partition-memory-bytes is also set")

Review comment:
       nit: this opt -> this **option**

##########
File path: core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
##########
@@ -123,4 +163,37 @@ class ConsoleProducerTest {
     assertEquals("console-producer",
       producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
   }
+
+  @Test
+  def testBatchSizeOverride(): Unit = {
+    val config = new ConsoleProducer.ProducerConfig(batchSizeOverride)
+    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
+    assertEquals("456",
+      producerConfig.getString(ProducerConfig.BATCH_SIZE_CONFIG))
+  }
+
+  @Test
+  def testBatchSizeSet(): Unit = {
+    val config = new ConsoleProducer.ProducerConfig(batchSizeSet)

Review comment:
       renamed to `testBatchSizeSetButMaxPartitionMemoryBytesNotSet`
   And also the config variable name `batchSizeSet` -> 
`batchSizeSetButMaxPartitionMemoryBytesNotSet`

##########
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##########
@@ -260,6 +270,7 @@ object ConsoleProducer {
                                DefaultCompressionCodec.name
                              else compressionCodecOptionValue
                            else NoCompressionCodec.name
+    val batchSize = options.valueOf(batchSizeOpt)

Review comment:
       This is not used. It can be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to