yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1260946148


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, 
String> props) {
                 : ExactlyOnceSupport.UNSUPPORTED;
     }
 
+    @Override
+    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<Map<String, ?>, Map<String, ?>> offsets) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+        String filename = config.getString(FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
+            // If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+            throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+                    "This is because stdin is used for input and offsets are 
not tracked.");
+        }
+
+        // This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+        // However, there could also be source partitions from previous 
configurations of the connector.
+        for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : 
offsets.entrySet()) {
+            Map<String, ?> partition = partitionOffset.getKey();
+            if (partition == null) {
+                throw new ConnectException("Partition objects cannot be null");
+            }
+
+            if (!partition.containsKey(FILENAME_FIELD)) {
+                throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+            }
+
+            Map<String, ?> offset = partitionOffset.getValue();
+            // null offsets are allowed and represent a deletion of offsets 
for a partition
+            if (offset == null) {
+                return true;
+            }
+
+            if (!offset.containsKey(POSITION_FIELD)) {
+                throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+            }
+
+            // The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+            try {
+                long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   I found something interesting when experimenting with this. When passing a 
request body to the `PATCH /offsets` endpoint like:
   
   ```
   {
     "offsets": [
       {
         "partition": {
           "filename": "/path/to/filename"
         },
         "offset": {
           "position": 20
         }
       }
     ]
   }
   ```
   
   the position `20` is deserialized to an `Integer` by Jackson (the JSON 
library we use in the REST layer for Connect) which seems fine because JSON 
doesn't have separate types for 32 bit and 64 bit numbers. So, the offsets map 
that is passed to `FileStreamSourceConnector::alterOffsets` by the runtime also 
contains `20` as an `Integer` value. I initially thought that this would cause 
the `FileStreamSourceTask` to fail at startup because it uses an `instanceof 
Long` check 
[here](https://github.com/apache/kafka/blob/aafbe3444354cfb0e4a8fdbd3443a63ba867c732/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java#L95C58-L95C58)
 (and an `Integer` value is obviously not an instance of `Long`). However, 
interestingly, the task did not fail and doing some debugging revealed that 
after the offsets are serialized and deserialized by the `JsonConverter` in 
`OffsetsStorageWriter` (in `Worker::modifySourceConnectorOffsets`) and 
`OffsetsStorageReader` respectively, the 
 offsets map that is retrieved by the task on startup through its context 
contains the position `20` as a `Long` value. 
   
   While this particular case is easily handled by simply accepting `Integer` 
values as valid in the `FileStreamSourceConnector::alterOffsets` method, I'm 
thinking we probably need to make some changes so that the offsets map passed 
to source connectors in their `alterOffsets` method is the same as the offsets 
map that connectors / tasks will retrieve via the `OffsetsStorageReader` from 
their context (otherwise, this could lead to some hard to debug issues in other 
connectors implementing the `SourceConnector::alterOffsets` method). The 
easiest way off the top of my head would probably would be to serialize and 
deserialize the offsets map using the `JsonConverter` before invoking 
`SourceConnector::alterOffsets`. WDYT?
   
   Furthermore, just checking whether the offset position is an instance of 
`Long` (Jackson uses a `Long` if the number doesn't fit in an `Integer`) or 
`Integer` in the `FileStreamSourceConnector::alterOffsets` method seems 
sub-optimal because:
   
   - To someone just reading through the `FileStreamSourceConnector` and 
`FileStreamSourceTask` classes, accepting `Integer` instances during validation 
but requiring `Long` instances in the actual task would look like a bug since 
the serialization + deserialization aspect isn't transparent.
   - It's an extremely unlikely scenario, but any changes in Jackson's deser 
logic could break things here - for instance, if smaller numbers are 
deserialized into `Short`s instead of `Integer`s. Parsing using a combination 
of `String::valueOf` and `Long::parseLong` seems a lot more robust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to