C0urante commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1258640264
########## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ########## @@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { : ExactlyOnceSupport.UNSUPPORTED; } + @Override + public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) { + AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); + String filename = config.getString(FILE_CONFIG); + if (filename == null || filename.isEmpty()) { + // If the 'file' configuration is unspecified, stdin is used and no offsets are tracked + throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); Review Comment: Should we tell users that this is because stdin will be used and we don't do any offset tracking in that case? ########## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ########## @@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { : ExactlyOnceSupport.UNSUPPORTED; } + @Override + public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) { + AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); + String filename = config.getString(FILE_CONFIG); + if (filename == null || filename.isEmpty()) { + // If the 'file' configuration is unspecified, stdin is used and no offsets are tracked + throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); + } + + // This connector makes use of a single source partition at a time which represents the file that it is configured to read from. + // However, there could also be source partitions from previous configurations of the connector. + for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : offsets.entrySet()) { + Map<String, ?> partition = partitionOffset.getKey(); + if (partition == null) { + throw new ConnectException("Partition objects cannot be null"); + } + + if (!partition.containsKey(FILENAME_FIELD)) { + throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); + } + + Map<String, ?> offset = partitionOffset.getValue(); + // null offsets are allowed and represent a deletion of offsets for a partition + if (offset != null && !offset.containsKey(POSITION_FIELD)) { + throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); + } + } + + // Let the task validate the actual value for the offset position on startup Review Comment: Any reason not to do this preemptively? We could at least validate that the value for the "position" key is non-null, is a numeric type, and is non-negative. ########## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ########## @@ -147,4 +151,59 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + + @Test + public void testAlterOffsetsStdin() { + sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); + Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap( + Collections.singletonMap(FILENAME_FIELD, FILENAME), + Collections.singletonMap(POSITION_FIELD, 0) + ); + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); + } + + @Test + public void testAlterOffsetsIncorrectPartitionKey() { + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( + Collections.singletonMap("invalid_partition_key", FILENAME), + Collections.singletonMap(POSITION_FIELD, 0) + ))); + + // null partitions are invalid + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( + null, + Collections.singletonMap(POSITION_FIELD, 0) + ))); + } + + @Test + public void testAlterOffsetsMultiplePartitions() { + Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); + offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); + offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); + connector.alterOffsets(sourceProperties, offsets); Review Comment: Can we wrap this and other successful calls to `connector::alterOffsets` in a call to `assertTrue`? ########## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ########## @@ -147,4 +151,59 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + + @Test + public void testAlterOffsetsStdin() { + sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); + Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap( + Collections.singletonMap(FILENAME_FIELD, FILENAME), + Collections.singletonMap(POSITION_FIELD, 0) + ); + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); + } + + @Test + public void testAlterOffsetsIncorrectPartitionKey() { + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( + Collections.singletonMap("invalid_partition_key", FILENAME), Review Comment: Nit: IMO this implies that the presence of, e.g., an `invalid_partition_key` field in a source partition is not permitted, but in reality, it looks like we do permit keys like that, as long as the `filename` field is also present in the source partition. Perhaps we could rename this to "other_partition_key" so that it's clearer that it's not really this key's fault that the request gets rejected? ########## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ########## @@ -147,4 +151,59 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + + @Test + public void testAlterOffsetsStdin() { + sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); + Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap( + Collections.singletonMap(FILENAME_FIELD, FILENAME), + Collections.singletonMap(POSITION_FIELD, 0) + ); + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); + } + + @Test + public void testAlterOffsetsIncorrectPartitionKey() { + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( + Collections.singletonMap("invalid_partition_key", FILENAME), + Collections.singletonMap(POSITION_FIELD, 0) + ))); + + // null partitions are invalid + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( + null, + Collections.singletonMap(POSITION_FIELD, 0) + ))); + } + + @Test + public void testAlterOffsetsMultiplePartitions() { + Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); + offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); + offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); + connector.alterOffsets(sourceProperties, offsets); + } + + @Test + public void testAlterOffsetsIncorrectOffsetKey() { + Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap( + Collections.singletonMap(FILENAME_FIELD, FILENAME), + Collections.singletonMap("invalid_offset_key", 0) Review Comment: Same nit as with the `testAlterOffsetsIncorrectPartitionKey` case: maybe `other_offset_key` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org