[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

2022-08-09 Thread GitBox


C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r941371749


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -20,36 +20,35 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
+
+private static final Logger log = 
LoggerFactory.getLogger(FileStreamSourceConnector.class);
 public static final String TOPIC_CONFIG = "topic";
 public static final String FILE_CONFIG = "file";
 public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
 public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
 
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
+static final ConfigDef CONFIG_DEF = new ConfigDef()
 .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source 
filename. If not specified, the standard input will be used")
-.define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to 
publish data to")
+.define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new 
ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
 .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, 
Importance.LOW,
 "The maximum number of records the Source task can read from 
file one time");

Review Comment:
   ```suggestion
   "The maximum number of records the source task can read from 
the file each time it is polled");
   ```



##
docs/connect.html:
##
@@ -423,9 +422,13 @@ Connector
 
 @Override
 public void start(MapString, String props) {
-// The complete version includes error handling as well.
-filename = props.get(FILE_CONFIG);
-topic = props.get(TOPIC_CONFIG);
+// All initialization logic and setting up of resources goes in this 
method. The FileStreamSourceConnector, however, doesn't need such logic here.

Review Comment:
   ```suggestion
   // Initialization logic and setting up resources can take place in this 
method. This connector doesn't need to do any of that, but we do log a helpful 
message to the user.
   ```



##
docs/connect.html:
##
@@ -440,19 +443,17 @@ Connector
 
 @Override
 public ListMapString, String taskConfigs(int maxTasks) {
+// This method is where connectors provide the task configs for the tasks 
that are to be created for this connector.
+// The length of the list determines the number of tasks that need to be 
created. The FileStreamSourceConnector, however, is
+// only capable of spinning up a single task (since there isn't work that 
can be distributed among multiple tasks).
+// Note that the task configs could contain configs additional to or 
different from the connector configs if needed (for instance,
+// if different tasks have different responsibilities, or if different 
tasks are meant to process different subsets of the source data stream).

Review Comment:
   This is really verbose. Can we simplify? I was hoping we'd be able to spell 
things out here in 1-2 lines.
   
   Keep in mind that the next paragraph provides a lot of useful info already:
   > Even with multiple tasks, this method implementation is usually pretty 
simple. It just has to determine the number of input tasks, which may require 
contacting the remote service it is pulling data from, and then divvy them up. 
Because some patterns for splitting work among tasks are so common, some 
utilities are provided in ConnectorUtils to simplify these cases.



##
docs/connect.html:
##
@@ -609,9 +618,11 @@ Connect Configuration Valida
 The following code in FileStreamSourceConnector defines 
the configuration and exposes it to the framework.
 
 
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
-.define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
-.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish 
data to");
+static final ConfigDef CONFIG_DEF = new ConfigDef()
+.define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. 
If not specified, the standard input will be used")
+.define(TOPIC_CONFIG, Type.STRING, 

[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

2022-08-08 Thread GitBox


C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940603023


##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java:
##
@@ -74,16 +73,25 @@ public void testSinkTasks() {
 
 @Test
 public void testSinkTasksStdout() {
-sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+sinkProperties.remove(FileStreamSinkConnector.FILE_CONFIG);
 connector.start(sinkProperties);
 List> taskConfigs = connector.taskConfigs(1);
 assertEquals(1, taskConfigs.size());
-
assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+
assertNull(taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
 }
 
 @Test
 public void testTaskClass() {
 connector.start(sinkProperties);
 assertEquals(FileStreamSinkTask.class, connector.taskClass());
 }
+
+@Test
+public void testConnectorConfigsPropagateToTaskConfigs() {

Review Comment:
   Might be worth adding a comment here (and in the `start` implementations for 
each connector) on why we do this and/or containing a reference to the Jira 
ticket for KAFKA-13809?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

2022-08-08 Thread GitBox


C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940601003


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java:
##
@@ -25,21 +24,19 @@
 import org.apache.kafka.connect.sink.SinkConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
+ * Very simple sink connector that works with stdout or a file.
  */
 public class FileStreamSinkConnector extends SinkConnector {
 
-public static final String FILE_CONFIG = "file";
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
+static final String FILE_CONFIG = "file";
+static final ConfigDef CONFIG_DEF = new ConfigDef()

Review Comment:
   (These should be reverted too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

2022-08-08 Thread GitBox


C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940599072


##
docs/connect.html:
##
@@ -443,10 +440,7 @@ Connector
 ArrayListMapString, String configs = new 
ArrayList();
 // Only one input stream makes sense.
 MapString, String config = new HashMap();
-if (filename != null)
-config.put(FILE_CONFIG, filename);
-config.put(TOPIC_CONFIG, topic);
-configs.add(config);
+configs.add(props);
 return configs;
 }
 

Review Comment:
    thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

2022-08-08 Thread GitBox


C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940598535


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -16,40 +16,35 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
-public static final String TOPIC_CONFIG = "topic";
-public static final String FILE_CONFIG = "file";
-public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+static final String TOPIC_CONFIG = "topic";
+static final String FILE_CONFIG = "file";
+static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
+static final int DEFAULT_TASK_BATCH_SIZE = 2000;

Review Comment:
   Yeah, we should revert. They may be useful in integration tests and while 
they're not strictly public API, I don't see a strong argument for forcing 
people to use reflection to access now-private fields that haven't caused any 
trouble while they've been public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

2022-08-08 Thread GitBox


C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940596673


##
docs/connect.html:
##
@@ -423,9 +422,7 @@ Connector
 
 @Override
 public void start(MapString, String props) {
-// The complete version includes error handling as well.
-filename = props.get(FILE_CONFIG);
-topic = props.get(TOPIC_CONFIG);
+this.props = props;

Review Comment:
   Yeah, I like the sound of that 
   
   One tiny change: I think "Starting file source connector reading from 
filename" would  be better than the wording I suggested earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

2022-08-04 Thread GitBox


C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r936225081


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -16,40 +16,35 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
-public static final String TOPIC_CONFIG = "topic";
-public static final String FILE_CONFIG = "file";
-public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+static final String TOPIC_CONFIG = "topic";
+static final String FILE_CONFIG = "file";
+static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
+static final int DEFAULT_TASK_BATCH_SIZE = 2000;

Review Comment:
   Why reduce the visibility of these?



##
docs/connect.html:
##
@@ -443,10 +440,7 @@ Connector
 ArrayListMapString, String configs = new 
ArrayList();
 // Only one input stream makes sense.
 MapString, String config = new HashMap();

Review Comment:
   Can we remove this line now?



##
docs/connect.html:
##
@@ -466,23 +460,26 @@ Task Example - Sourc
 
 
 public class FileStreamSourceTask extends SourceTask {
-String filename;
-InputStream stream;
-String topic;
+private String filename;
+private InputStream stream;
+private String topic;
+private int batchSize;
 
 @Override
 public void start(MapString, String props) {
-filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+AbstractConfig config = new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);

Review Comment:
   I think it'd be acceptable to leave this section as-was since the addition 
of the `AbstractConfig` class and `CONFIG_DEF` field introduces a bit too much 
new info into the mix that's not directly related to teaching beginners about 
the `start`/`stop`/`poll` methods.
   
   We also call out that "We'll use pseudo-code to describe most of the 
implementation, but you can refer to the source code for the full example" for 
the source task section, so it's less important that we keep the code for the 
connector and the code in these examples strictly in-sync here.



##
docs/connect.html:
##
@@ -423,9 +422,7 @@ Connector
 
 @Override
 public void start(MapString, String props) {
-// The complete version includes error handling as well.
-filename = props.get(FILE_CONFIG);
-topic = props.get(TOPIC_CONFIG);
+this.props = props;

Review Comment:
   This feels a bit too much like magic boilerplate now. I imagine that if I 
were reading these docs for the first time to get up to speed with the 
connector API, it wouldn't really help me understand what the purposes of the 
`start` and `taskConfigs` methods are.
   
   I've noticed recently that we call out in the source task section that the 
code snippets may differ from the actual source code for the task. I'd be 
tempted to leave this as-is and add a similar disclaimer for this section, 
except it'd run the risk that users might use the same style, which would leave 
their connectors susceptible to KAFKA-9228.
   
   Can you think of a way to keep the source code changes here (i.e., storing 
`props` instead of just `filename` and `topic`), but also give some idea to 
users about what they can/should be doing in `start`?
   
   Some rejected alternatives I've thought of include:
   - Validating that the file exists (bad practice, since in most cases this 
should either be done in a [custom config 
validator](https://kafka.apache.org/32/javadoc/org/apache/kafka/common/config/ConfigDef.Validator.html)
 or on task startup)
   - Waiting for the file to exist, then [requesting a task 
reconfiguration](https://kafka.apache.org/32/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration())
 (too complicated and relies on concepts that should be introduced later on)
   - Logging the config props (bad practice, can reveal secrets in plaintext, 
should never be done)
   
   The best I can