yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1257110479


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,40 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, 
String> props) {
                 : ExactlyOnceSupport.UNSUPPORTED;
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+        String filename = config.getString(FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
+            // If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+            throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");
+        }
+
+        // An empty offsets map could indicate that the offsets were reset 
previously or that no offsets have been committed yet (for a reset operation)
+        // - we don't need any additional validation for this case.
+        if (offsets.size() == 0) {
+            return true;
+        }
+
+        // This connector makes use of a single source partition which 
represents the file that it is configured to read from
+        if (offsets.size() > 1) {
+            throw new ConnectException("The " + 
FileStreamSourceConnector.class.getSimpleName() + " supports only a single 
source partition / file");

Review Comment:
   Oh wow, that's a really good catch, thanks! One point to note is that in a 
`DELETE /offsets` request for a source connector, users don't specify the 
partitions / offsets (we write `null` offsets for every known partition).
   
   Thinking about this a bit more, we shouldn't fail the request even if there 
are multiple partitions with non-null values. Taking the same example you used, 
if a user calls `GET /offsets` and copies the response body as the request body 
for a `PATCH /offsets` request while only modifying the offset value for file 
`b`, I don't think it makes sense to fail that request. Also, requests that 
don't even include the current source partition are also fairly harmless. I've 
relaxed the validation criteria in the latest patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to