gharris1727 commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1256448988


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,40 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, 
String> props) {
                 : ExactlyOnceSupport.UNSUPPORTED;
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+        String filename = config.getString(FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
+            // If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+            throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");
+        }
+
+        // An empty offsets map could indicate that the offsets were reset 
previously or that no offsets have been committed yet (for a reset operation)
+        // - we don't need any additional validation for this case.
+        if (offsets.size() == 0) {
+            return true;
+        }
+
+        // This connector makes use of a single source partition which 
represents the file that it is configured to read from
+        if (offsets.size() > 1) {
+            throw new ConnectException("The " + 
FileStreamSourceConnector.class.getSimpleName() + " supports only a single 
source partition / file");

Review Comment:
   Suppose that:
   1. The connector is configured with one file `a`
   2. The task commits offsets for `a`
   3. The connector is reconfigured for a different file `b`
   4. The task commits offsets for `b`
   5. A user calls `GET /offsets` and sees both `a` and `b`
   6. A user calls `DELETE /offsets` with `a`, or `a` and `b` in the same 
request.
   
   I think in this situation, it is reasonable for the delete to succeed; The 
user clearly intends to delete the old offset, and we shouldn't force them to 
revert the configuration to contain `a` to delete that offset. Also, a `PATCH 
/offsets` which clears `a` and updates `b` in the same request should succeed. 
   
   I think it is reasonable to fail a request which updates `a`, since that is 
not an offset which the current configuration could produce.
   
   So in other words: If the value is non-null, then we should validate the 
key. If the value is null, then any filename should be allowed, since it might 
have been from a previous configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to