yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1257110479
########## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ########## @@ -101,4 +105,40 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { : ExactlyOnceSupport.UNSUPPORTED; } + @Override + public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) { + AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); + String filename = config.getString(FILE_CONFIG); + if (filename == null || filename.isEmpty()) { + // If the 'file' configuration is unspecified, stdin is used and no offsets are tracked + throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); + } + + // An empty offsets map could indicate that the offsets were reset previously or that no offsets have been committed yet (for a reset operation) + // - we don't need any additional validation for this case. + if (offsets.size() == 0) { + return true; + } + + // This connector makes use of a single source partition which represents the file that it is configured to read from + if (offsets.size() > 1) { + throw new ConnectException("The " + FileStreamSourceConnector.class.getSimpleName() + " supports only a single source partition / file"); Review Comment: Oh wow, that's a really good catch, thanks! One point to note is that in a `DELETE /offsets` request for a source connector, users don't specify the partitions / offsets (we write `null` offsets for every known partition). Thinking about this a bit more, we shouldn't fail the request even if there are multiple partitions with non-null values. Taking the same example you used, if a user calls `GET /offsets` and copies the response body as the request body for a `PATCH /offsets` request while only modifying the offset value for file `b`, I don't think it makes sense to fail that request. Also, requests that don't even include the current source partition are also fairly harmless. I've relaxed the validation criteria in the latest patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org