C0urante commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259782359
########## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ########## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { : ExactlyOnceSupport.UNSUPPORTED; } + @Override + public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) { + AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); + String filename = config.getString(FILE_CONFIG); + if (filename == null || filename.isEmpty()) { + // If the 'file' configuration is unspecified, stdin is used and no offsets are tracked + throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + + "This is because stdin is used for input and offsets are not tracked."); + } + + // This connector makes use of a single source partition at a time which represents the file that it is configured to read from. + // However, there could also be source partitions from previous configurations of the connector. + for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : offsets.entrySet()) { + Map<String, ?> partition = partitionOffset.getKey(); + if (partition == null) { + throw new ConnectException("Partition objects cannot be null"); + } + + if (!partition.containsKey(FILENAME_FIELD)) { + throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); + } + + Map<String, ?> offset = partitionOffset.getValue(); + // null offsets are allowed and represent a deletion of offsets for a partition + if (offset == null) { + return true; Review Comment: Shouldn't this be `continue`? ########## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ########## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { : ExactlyOnceSupport.UNSUPPORTED; } + @Override + public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) { + AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); + String filename = config.getString(FILE_CONFIG); + if (filename == null || filename.isEmpty()) { + // If the 'file' configuration is unspecified, stdin is used and no offsets are tracked Review Comment: We can remove this comment now, right? ########## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ########## @@ -147,4 +152,102 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + + @Test + public void testAlterOffsetsStdin() { + sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); + Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap( + Collections.singletonMap(FILENAME_FIELD, FILENAME), + Collections.singletonMap(POSITION_FIELD, 0) + ); + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); + } + + @Test + public void testAlterOffsetsIncorrectPartitionKey() { + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( + Collections.singletonMap("other_partition_key", FILENAME), + Collections.singletonMap(POSITION_FIELD, 0) + ))); + + // null partitions are invalid + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( + null, + Collections.singletonMap(POSITION_FIELD, 0) + ))); + } + + @Test + public void testAlterOffsetsMultiplePartitions() { + Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); + offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); + offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); + assertTrue(connector.alterOffsets(sourceProperties, offsets)); + } + + @Test + public void testAlterOffsetsIncorrectOffsetKey() { + Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap( + Collections.singletonMap(FILENAME_FIELD, FILENAME), + Collections.singletonMap("other_offset_key", 0) + ); + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); + } + + @Test + public void testAlterOffsetsOffsetPositionValues() { Review Comment: Maybe getting a little too fancy here, but it looks like we repeat the same four-line stanza several times in this test case and could reduce duplication. I toyed with the idea of creating a function-local function since a separate `private boolean alterOffsets(Object offset)` method seemed a bit too cluttered: ```java @Test public void testAlterOffsetsOffsetPositionValues() { Function<Object, Boolean> alterOffsets = offset -> connector.alterOffsets(sourceProperties, Collections.singletonMap( Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, offset) )); assertThrows(ConnectException.class, () -> alterOffsets.apply("nan")); assertThrows(ConnectException.class, () -> alterOffsets.apply(null)); // ... assertTrue(alterOffsets.apply(10)); } ``` Thoughts? ########## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ########## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { : ExactlyOnceSupport.UNSUPPORTED; } + @Override + public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) { + AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); + String filename = config.getString(FILE_CONFIG); + if (filename == null || filename.isEmpty()) { + // If the 'file' configuration is unspecified, stdin is used and no offsets are tracked + throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + + "This is because stdin is used for input and offsets are not tracked."); + } + + // This connector makes use of a single source partition at a time which represents the file that it is configured to read from. + // However, there could also be source partitions from previous configurations of the connector. + for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : offsets.entrySet()) { + Map<String, ?> partition = partitionOffset.getKey(); + if (partition == null) { + throw new ConnectException("Partition objects cannot be null"); + } + + if (!partition.containsKey(FILENAME_FIELD)) { + throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); + } + + Map<String, ?> offset = partitionOffset.getValue(); + // null offsets are allowed and represent a deletion of offsets for a partition + if (offset == null) { + return true; + } + + if (!offset.containsKey(POSITION_FIELD)) { + throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); + } + + // The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value + try { + long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: Isn't this too permissive? Based on the `testAlterOffsetsOffsetPositionValues` test case, this would allow values of both `"10"` (string) and `10` (number) for the offset. But it looks like the source task class would [only work with numbers](https://github.com/apache/kafka/blob/b8f3776f2458d0270276bd3d59db66f4c7f3748c/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java#L95-L96). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org