yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259462328


##########
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##########
@@ -147,4 +151,59 @@ public void testInvalidBatchSize() {
         sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
         assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
     }
+
+    @Test
+    public void testAlterOffsetsStdin() {
+        sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                Collections.singletonMap(FILENAME_FIELD, FILENAME),
+                Collections.singletonMap(POSITION_FIELD, 0)
+        );
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+    }
+
+    @Test
+    public void testAlterOffsetsIncorrectPartitionKey() {
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+                Collections.singletonMap("invalid_partition_key", FILENAME),
+                Collections.singletonMap(POSITION_FIELD, 0)
+        )));
+
+        // null partitions are invalid
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+                null,
+                Collections.singletonMap(POSITION_FIELD, 0)
+        )));
+    }
+
+    @Test
+    public void testAlterOffsetsMultiplePartitions() {
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+        offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+        connector.alterOffsets(sourceProperties, offsets);

Review Comment:
   Ah, good point. Done.



##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, 
String> props) {
                 : ExactlyOnceSupport.UNSUPPORTED;
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+        String filename = config.getString(FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
+            // If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+            throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");

Review Comment:
   Makes sense, done.



##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, 
String> props) {
                 : ExactlyOnceSupport.UNSUPPORTED;
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+        String filename = config.getString(FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
+            // If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+            throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");
+        }
+
+        // This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+        // However, there could also be source partitions from previous 
configurations of the connector.
+        for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : 
offsets.entrySet()) {
+            Map<String, ?> partition = partitionOffset.getKey();
+            if (partition == null) {
+                throw new ConnectException("Partition objects cannot be null");
+            }
+
+            if (!partition.containsKey(FILENAME_FIELD)) {
+                throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+            }
+
+            Map<String, ?> offset = partitionOffset.getValue();
+            // null offsets are allowed and represent a deletion of offsets 
for a partition
+            if (offset != null && !offset.containsKey(POSITION_FIELD)) {
+                throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+            }
+        }
+
+        // Let the task validate the actual value for the offset position on 
startup

Review Comment:
   My initial reasoning was that we probably wouldn't want to open an input 
stream here to read from the file and verify whether the offset position is 
actually valid. But yeah, no reason we can't do basic validations like checking 
whether it's a non-negative numeric value. I've made this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to