yashmayya commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r933937606


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -16,40 +16,35 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
-    public static final String TOPIC_CONFIG = "topic";
-    public static final String FILE_CONFIG = "file";
-    public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+    static final String TOPIC_CONFIG = "topic";
+    static final String FILE_CONFIG = "file";
+    static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source 
filename. If not specified, the standard input will be used")
-        .define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to 
publish data to")
+        .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new 
ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")

Review Comment:
   Not sure why this was defined as a `LIST` config when config values with 
more than one topic were anyway being rejected. I've updated this to be a 
`STRING` config and this should be backward compatible for config values that 
were valid previously.



##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -77,12 +65,7 @@ public Class<? extends Task> taskClass() {
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         ArrayList<Map<String, String>> configs = new ArrayList<>();
         // Only one input stream makes sense.
-        Map<String, String> config = new HashMap<>();
-        if (filename != null)
-            config.put(FILE_CONFIG, filename);
-        config.put(TOPIC_CONFIG, topic);
-        config.put(TASK_BATCH_SIZE_CONFIG, String.valueOf(batchSize));

Review Comment:
   Removed the redundant parsing from `String` to `int` (in `start()`) and then 
back to `String` here; it's cleaner to use the helper methods from 
`AbstractConfig` directly in the task class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to