C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r941371749


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -20,36 +20,35 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
+
+    private static final Logger log = 
LoggerFactory.getLogger(FileStreamSourceConnector.class);
     public static final String TOPIC_CONFIG = "topic";
     public static final String FILE_CONFIG = "file";
     public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
     public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source 
filename. If not specified, the standard input will be used")
-        .define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to 
publish data to")
+        .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new 
ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
         .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, 
Importance.LOW,
                 "The maximum number of records the Source task can read from 
file one time");

Review Comment:
   ```suggestion
                   "The maximum number of records the source task can read from 
the file each time it is polled");
   ```



##########
docs/connect.html:
##########
@@ -423,9 +422,13 @@ <h5><a id="connect_connectorexample" 
href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public void start(Map&lt;String, String&gt; props) {
-    // The complete version includes error handling as well.
-    filename = props.get(FILE_CONFIG);
-    topic = props.get(TOPIC_CONFIG);
+    // All initialization logic and setting up of resources goes in this 
method. The FileStreamSourceConnector, however, doesn't need such logic here.

Review Comment:
   ```suggestion
       // Initialization logic and setting up resources can take place in this 
method. This connector doesn't need to do any of that, but we do log a helpful 
message to the user.
   ```



##########
docs/connect.html:
##########
@@ -440,19 +443,17 @@ <h5><a id="connect_connectorexample" 
href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public List&lt;Map&lt;String, String&gt;&gt; taskConfigs(int maxTasks) {
+    // This method is where connectors provide the task configs for the tasks 
that are to be created for this connector.
+    // The length of the list determines the number of tasks that need to be 
created. The FileStreamSourceConnector, however, is
+    // only capable of spinning up a single task (since there isn't work that 
can be distributed among multiple tasks).
+    // Note that the task configs could contain configs additional to or 
different from the connector configs if needed (for instance,
+    // if different tasks have different responsibilities, or if different 
tasks are meant to process different subsets of the source data stream).

Review Comment:
   This is really verbose. Can we simplify? I was hoping we'd be able to spell 
things out here in 1-2 lines.
   
   Keep in mind that the next paragraph provides a lot of useful info already:
   > Even with multiple tasks, this method implementation is usually pretty 
simple. It just has to determine the number of input tasks, which may require 
contacting the remote service it is pulling data from, and then divvy them up. 
Because some patterns for splitting work among tasks are so common, some 
utilities are provided in ConnectorUtils to simplify these cases.



##########
docs/connect.html:
##########
@@ -609,9 +618,11 @@ <h4><a id="connect_configs" 
href="#connect_configs">Connect Configuration Valida
     <p>The following code in <code>FileStreamSourceConnector</code> defines 
the configuration and exposes it to the framework.</p>
 
     <pre class="brush: java;">
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
-    .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
-    .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish 
data to");
+static final ConfigDef CONFIG_DEF = new ConfigDef()
+    .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. 
If not specified, the standard input will be used")
+    .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new 
ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
+    .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, 
Importance.LOW,
+        "The maximum number of records the Source task can read from file one 
time");

Review Comment:
   Nit (I know this was copied from an existing line but it should still be 
fixed):
   ```suggestion
           "The maximum number of records the source task can read from the 
file each time it is polled");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to