C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1258640264


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, 
String> props) {
                 : ExactlyOnceSupport.UNSUPPORTED;
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+        String filename = config.getString(FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
+            // If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+            throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");

Review Comment:
   Should we tell users that this is because stdin will be used and we don't do 
any offset tracking in that case?



##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, 
String> props) {
                 : ExactlyOnceSupport.UNSUPPORTED;
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+        String filename = config.getString(FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
+            // If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+            throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");
+        }
+
+        // This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+        // However, there could also be source partitions from previous 
configurations of the connector.
+        for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : 
offsets.entrySet()) {
+            Map<String, ?> partition = partitionOffset.getKey();
+            if (partition == null) {
+                throw new ConnectException("Partition objects cannot be null");
+            }
+
+            if (!partition.containsKey(FILENAME_FIELD)) {
+                throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+            }
+
+            Map<String, ?> offset = partitionOffset.getValue();
+            // null offsets are allowed and represent a deletion of offsets 
for a partition
+            if (offset != null && !offset.containsKey(POSITION_FIELD)) {
+                throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+            }
+        }
+
+        // Let the task validate the actual value for the offset position on 
startup

Review Comment:
   Any reason not to do this preemptively? We could at least validate that the 
value for the "position" key is non-null, is a numeric type, and is 
non-negative.



##########
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##########
@@ -147,4 +151,59 @@ public void testInvalidBatchSize() {
         sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
         assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
     }
+
+    @Test
+    public void testAlterOffsetsStdin() {
+        sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                Collections.singletonMap(FILENAME_FIELD, FILENAME),
+                Collections.singletonMap(POSITION_FIELD, 0)
+        );
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+    }
+
+    @Test
+    public void testAlterOffsetsIncorrectPartitionKey() {
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+                Collections.singletonMap("invalid_partition_key", FILENAME),
+                Collections.singletonMap(POSITION_FIELD, 0)
+        )));
+
+        // null partitions are invalid
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+                null,
+                Collections.singletonMap(POSITION_FIELD, 0)
+        )));
+    }
+
+    @Test
+    public void testAlterOffsetsMultiplePartitions() {
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+        offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+        connector.alterOffsets(sourceProperties, offsets);

Review Comment:
   Can we wrap this and other successful calls to `connector::alterOffsets` in 
a call to `assertTrue`?



##########
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##########
@@ -147,4 +151,59 @@ public void testInvalidBatchSize() {
         sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
         assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
     }
+
+    @Test
+    public void testAlterOffsetsStdin() {
+        sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                Collections.singletonMap(FILENAME_FIELD, FILENAME),
+                Collections.singletonMap(POSITION_FIELD, 0)
+        );
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+    }
+
+    @Test
+    public void testAlterOffsetsIncorrectPartitionKey() {
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+                Collections.singletonMap("invalid_partition_key", FILENAME),

Review Comment:
   Nit: IMO this implies that the presence of, e.g., an `invalid_partition_key` 
field in a source partition is not permitted, but in reality, it looks like we 
do permit keys like that, as long as the `filename` field is also present in 
the source partition.
   
   Perhaps we could rename this to "other_partition_key" so that it's clearer 
that it's not really this key's fault that the request gets rejected?



##########
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##########
@@ -147,4 +151,59 @@ public void testInvalidBatchSize() {
         sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
         assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
     }
+
+    @Test
+    public void testAlterOffsetsStdin() {
+        sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                Collections.singletonMap(FILENAME_FIELD, FILENAME),
+                Collections.singletonMap(POSITION_FIELD, 0)
+        );
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+    }
+
+    @Test
+    public void testAlterOffsetsIncorrectPartitionKey() {
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+                Collections.singletonMap("invalid_partition_key", FILENAME),
+                Collections.singletonMap(POSITION_FIELD, 0)
+        )));
+
+        // null partitions are invalid
+        assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+                null,
+                Collections.singletonMap(POSITION_FIELD, 0)
+        )));
+    }
+
+    @Test
+    public void testAlterOffsetsMultiplePartitions() {
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+        offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+        connector.alterOffsets(sourceProperties, offsets);
+    }
+
+    @Test
+    public void testAlterOffsetsIncorrectOffsetKey() {
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                Collections.singletonMap(FILENAME_FIELD, FILENAME),
+                Collections.singletonMap("invalid_offset_key", 0)

Review Comment:
   Same nit as with the `testAlterOffsetsIncorrectPartitionKey` case: maybe 
`other_offset_key` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to