gharris1727 commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1256448988
########## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ########## @@ -101,4 +105,40 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { : ExactlyOnceSupport.UNSUPPORTED; } + @Override + public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) { + AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); + String filename = config.getString(FILE_CONFIG); + if (filename == null || filename.isEmpty()) { + // If the 'file' configuration is unspecified, stdin is used and no offsets are tracked + throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); + } + + // An empty offsets map could indicate that the offsets were reset previously or that no offsets have been committed yet (for a reset operation) + // - we don't need any additional validation for this case. + if (offsets.size() == 0) { + return true; + } + + // This connector makes use of a single source partition which represents the file that it is configured to read from + if (offsets.size() > 1) { + throw new ConnectException("The " + FileStreamSourceConnector.class.getSimpleName() + " supports only a single source partition / file"); Review Comment: Suppose that: 1. The connector is configured with one file `a` 2. The task commits offsets for `a` 3. The connector is reconfigured for a different file `b` 4. The task commits offsets for `b` 5. A user calls `GET /offsets` and sees both `a` and `b` 6. A user calls `DELETE /offsets` with `a`, or `a` and `b` in the same request. I think in this situation, it is reasonable for the delete to succeed; The user clearly intends to delete the old offset, and we shouldn't force them to revert the configuration to contain `a` to delete that offset. Also, a `PATCH /offsets` which clears `a` and updates `b` in the same request should succeed. I think it is reasonable to fail a request which updates `a`, since that is not an offset which the current configuration could produce. So in other words: If the value is non-null, then we should validate the key. If the value is null, then any filename should be allowed, since it might have been from a previous configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org