chia7712 commented on code in PR #16646:
URL: https://github.com/apache/kafka/pull/16646#discussion_r1695591590


##########
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java:
##########
@@ -494,4 +473,60 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
         }
     }
 
+    static final class ConfigPostProcessor {
+        final String topicName;
+        final long numRecords;
+        final Integer recordSize;
+        final double throughput;
+        final boolean payloadMonotonic;
+        final Properties producerProps;
+        final boolean shouldPrintMetrics;
+        final Long transactionDurationMs;
+        final boolean transactionsEnabled;
+        final List<byte[]> payloadByteList;
+
+        public ConfigPostProcessor(ArgumentParser parser, String[] args) 
throws IOException, ArgumentParserException {
+            Namespace namespace = parser.parseArgs(args);
+            this.topicName = namespace.getString("topic");
+            this.numRecords = namespace.getLong("numRecords");
+            this.recordSize = namespace.getInt("recordSize");
+            this.throughput = namespace.getDouble("throughput");
+            this.payloadMonotonic = namespace.getBoolean("payloadMonotonic");
+            this.shouldPrintMetrics = namespace.getBoolean("printMetrics");
+
+            List<String> producerConfigs = namespace.getList("producerConfig");
+            String producerConfigFile = 
namespace.getString("producerConfigFile");
+            String payloadFilePath = namespace.getString("payloadFile");
+            Long transactionDurationMsArg = 
namespace.getLong("transactionDurationMs");
+            String transactionIdArg = namespace.getString("transactionalId");
+            if (producerConfigs == null && producerConfigFile == null) {
+                throw new ArgumentParserException("Either --producer-props or 
--producer.config must be specified.", parser);
+            }
+            if (transactionDurationMsArg != null && transactionDurationMsArg 
<= 0) {
+                throw new ArgumentParserException("--transaction-duration-ms 
should > 0", parser);
+            }
+
+            // since default value gets printed with the help text, we are 
escaping \n there and replacing it with correct value here.
+            String payloadDelimiter = 
namespace.getString("payloadDelimiter").equals("\\n")
+                    ? "\n" : namespace.getString("payloadDelimiter");
+            this.payloadByteList = readPayloadFile(payloadFilePath, 
payloadDelimiter);
+            this.producerProps = readProps(producerConfigs, 
producerConfigFile);
+            // setup transaction related configs
+            this.transactionsEnabled = transactionDurationMsArg != null
+                    || transactionIdArg != null
+                    || 
producerProps.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+            if (transactionsEnabled) {
+                Optional<String> txIdInProps =
+                        
Optional.ofNullable(producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG))
+                                .map(Object::toString);
+                String transactionId = 
Optional.ofNullable(transactionIdArg).orElse(txIdInProps.orElse(DEFAULT_TRANSACTION_ID));

Review Comment:
   Should we use UUID instead in order to allow users run this perf tool 
concurrently? Otherwise, using the same default transaction ID will be a issue.



##########
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java:
##########
@@ -47,79 +48,56 @@
 
 public class ProducerPerformance {
 
+    public static final String DEFAULT_TRANSACTION_ID = 
"performance-producer-default-transactional-id";
+    public static final Long DEFAULT_TRANSACTION_DURATION_MS = 3000L;

Review Comment:
   `long`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to