yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259821609


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, 
String> props) {
                 : ExactlyOnceSupport.UNSUPPORTED;
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+        String filename = config.getString(FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
+            // If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+            throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+                    "This is because stdin is used for input and offsets are 
not tracked.");
+        }
+
+        // This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+        // However, there could also be source partitions from previous 
configurations of the connector.
+        for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : 
offsets.entrySet()) {
+            Map<String, ?> partition = partitionOffset.getKey();
+            if (partition == null) {
+                throw new ConnectException("Partition objects cannot be null");
+            }
+
+            if (!partition.containsKey(FILENAME_FIELD)) {
+                throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+            }
+
+            Map<String, ?> offset = partitionOffset.getValue();
+            // null offsets are allowed and represent a deletion of offsets 
for a partition
+            if (offset == null) {
+                return true;
+            }
+
+            if (!offset.containsKey(POSITION_FIELD)) {
+                throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+            }
+
+            // The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+            try {
+                long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Ah, good catch, thanks. I think it might probably be a bit friendlier if we 
update the task class instead to do similar parsing, WDYT? I'm okay either way, 
since the most common use case would be copy pasting the output from `GET 
/offsets` and modifying it in which case users would end up using a number 
rather than a string anyway.



##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, 
String> props) {
                 : ExactlyOnceSupport.UNSUPPORTED;
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+        String filename = config.getString(FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
+            // If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+            throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+                    "This is because stdin is used for input and offsets are 
not tracked.");
+        }
+
+        // This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+        // However, there could also be source partitions from previous 
configurations of the connector.
+        for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : 
offsets.entrySet()) {
+            Map<String, ?> partition = partitionOffset.getKey();
+            if (partition == null) {
+                throw new ConnectException("Partition objects cannot be null");
+            }
+
+            if (!partition.containsKey(FILENAME_FIELD)) {
+                throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+            }
+
+            Map<String, ?> offset = partitionOffset.getValue();
+            // null offsets are allowed and represent a deletion of offsets 
for a partition
+            if (offset == null) {
+                return true;

Review Comment:
   Whoops, thanks. I forgot that we're no longer doing the single partition 
offset pair validation 🤦 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to