[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-18 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1266275006


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,49 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}

Review Comment:
   Hah this seems very similar to what we've been discussing on the MM2 PR and 
has somewhat comparable flexibility - user friendliness tradeoffs 
   
   Allowing users to modify offsets when the connector is currently configured 
to read from stdin could be useful in order to clean up stale offsets. However, 
it could be slightly misleading to accept offsets modification requests and 
return a successful response and message when in actuality the operation was a 
no-op as far as the connector's task is concerned. I'd also like to note for 
posterity that it isn't possible to detect past connector configurations or 
previous offsets in the `SourceConnector::alterOffsets` method. Both the 
following cases should be uncommon enough that either approach seems acceptable 
to me - 
   
   - An unfamiliar user attempts to modify offsets for a file stream source 
connector configured to read from stdin and expects the connector to re-produce 
past data.
   - A user configures a file stream source connector to read from a file, then 
reconfigures it to read from stdin, and then wants to clean up previous offsets.
   
   I think that the first case seems a bit more likely which is why I think the 
existing approach seems preferable, WDYT? Also, the second case can still be 
satisfied by temporarily reverting the configuration if the user **_really_** 
wants to clean up the offsets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-18 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1266275006


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,49 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}

Review Comment:
   Hah this seems very similar to what we've been discussing on the MM2 PR and 
has somewhat comparable flexibility - user friendliness tradeoffs 
   
   Allowing users to modify offsets when the connector is currently configured 
to read from stdin could be useful in order to clean up stale offsets. However, 
it could be slightly misleading to accept offsets modification requests and 
return a successful response and message when in actuality the operation was a 
no-op as far as the connector's task is concerned. I'd also like to note for 
posterity that it isn't possible to detect past connector configurations or 
previous offsets in the `SourceConnector::alterOffsets` method. Both the 
following cases seem uncommon enough that either approach should be acceptable 
- 
   
   - An unfamiliar user attempts to modify offsets for a file stream source 
connector configured to read from stdin and expects the connector to re-produce 
past data.
   - A user configures a file stream source connector to read from a file, then 
reconfigures it to read from stdin, and then wants to clean up previous offsets.



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,49 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}

Review Comment:
   Hah this seems very similar to what we've been discussing on the MM2 PR and 
has somewhat comparable flexibility - user friendliness tradeoffs 
   
   Allowing users to modify offsets when the connector is currently configured 
to read from stdin could be useful in order to clean up stale offsets. However, 
it could be slightly misleading to accept offsets modification requests and 
return a successful response and message when in actuality the operation was a 
no-op as far as the connector's task is concerned. I'd also like to note for 
posterity that it isn't possible to detect past connector configurations or 
previous offsets in the `SourceConnector::alterOffsets` method. Both the 
following cases seem uncommon enough that either approach should be acceptable 
- 
   
   - An unfamiliar user attempts to modify offsets for a file stream source 
connector configured to read from stdin and expects the connector to re-produce 
past data.
   - A user configures a file stream source connector to read from a file, then 
reconfigures it to read from stdin, and then wants to clean up previous offsets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261948519


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   > Hmmm... wouldn't that be a pretty serious breaking change if we 
accidentally switched up how the JSON converter deserializes integer types? Not 
just for the file source connector, but for plenty of others.
   
   Okay, that's fair enough, I've changed the check in 
`FileStreamSourceConnector::alterOffsets` to mirror the one made in the task at 
startup for consistency (and avoided making changes in the existing task 
logic). This does mean that this PR should be merged after 
https://github.com/apache/kafka/pull/14003 has been merged (assuming that that 
approach is acceptable).
   
   > I don't know if this significantly changes the conversation but it seems 
subtle and counterintuitive enough to bring up so that we can avoid 
accidentally breaking connector code that relies on this behavior.
   
   Hm yeah, that's definitely another interesting one to bring up - however, 
I'd contend that that one kinda makes sense since we're passing the 
`SourceRecord` itself - tasks already deal with `SourceRecord` and their 
offsets (and associated types) in their regular lifecycle. It would be highly 
confusing if the `SourceRecord` that they get in `commitRecord` doesn't match 
the one they dispatched to the framework via `poll`. Of course, ideally, the 
offsets that they read via `OffsetStorageReader` should also not have any type 
mismatches compared to the `SourceRecord` ones, but I don't think we'd want to 
(or safely could) change that at this point.
   
   Since the offsets being altered externally would correspond to the ones that 
the connector / tasks read at startup, I think it makes sense to align the 
types across invocations to `SourceConnector::alterOffsets` and offsets queried 
from an `OffsetStorageReader` (and an implicit separate alignment between the 
`SourceRecord`'s offsets types).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261948519


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   > Hmmm... wouldn't that be a pretty serious breaking change if we 
accidentally switched up how the JSON converter deserializes integer types? Not 
just for the file source connector, but for plenty of others.
   
   Okay, that's fair enough, I've changed the check in 
`FileStreamSourceConnector::alterOffsets` to mirror the one made in the task at 
startup for consistency (and avoided making changes in the existing task logic).
   
   > I don't know if this significantly changes the conversation but it seems 
subtle and counterintuitive enough to bring up so that we can avoid 
accidentally breaking connector code that relies on this behavior.
   
   Hm yeah, that's definitely another interesting one to bring up - however, 
I'd contend that that one kinda makes sense since we're passing the 
`SourceRecord` itself - tasks already deal with `SourceRecord` and their 
offsets (and associated types) in their regular lifecycle. It would be highly 
confusing if the `SourceRecord` that they get in `commitRecord` doesn't match 
the one they dispatched to the framework via `poll`. Of course, ideally, the 
offsets that they read via `OffsetStorageReader` should also not have any type 
mismatches compared to the `SourceRecord` ones, but I don't think we'd want to 
(or safely could) change that at this point.
   
   Since the offsets being altered externally would correspond to the ones that 
the connector / tasks read at startup, I think it makes sense to align the 
types across invocations to `SourceConnector::alterOffsets` and offsets queried 
from an `OffsetStorageReader` (and an implicit separate alignment between the 
`SourceRecord`'s offsets types).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261515978


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Since the type alignment issue seemed like a broader one (i.e. not scoped to 
the file connectors being touched here), I've created a separate 
[ticket](https://issues.apache.org/jira/browse/KAFKA-15182) and 
[PR](https://github.com/apache/kafka/pull/14003) for it.
   
   > it doesn't seem like a great endorsement of our API if we have to 
implement workarounds in the file connectors, which are the first example of 
the connector API that many developers see.
   
   I'd argue that it isn't really a workaround and that the current check 
itself is bad. If the (de)serialization happened to use `Integer` for values 
that fit in a 32 bit signed type (which would be perfectly valid and is exactly 
what we see currently before the values are passed through the converter), the 
current check in the `FileStreamSourceTask` would cause it to bail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-12 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1260946148


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   I found something interesting when experimenting with this. When passing a 
request body to the `PATCH /offsets` endpoint like:
   
   ```
   {
 "offsets": [
   {
 "partition": {
   "filename": "/path/to/filename"
 },
 "offset": {
   "position": 20
 }
   }
 ]
   }
   ```
   
   the position `20` is deserialized to an `Integer` by Jackson (the JSON 
library we use in the REST layer for Connect) which seems fine because JSON 
doesn't have separate types for 32 bit and 64 bit numbers. So, the offsets map 
that is passed to `FileStreamSourceConnector::alterOffsets` by the runtime also 
contains `20` as an `Integer` value. I initially thought that this would cause 
the `FileStreamSourceTask` to fail at startup because it uses an `instanceof 
Long` check 
[here](https://github.com/apache/kafka/blob/aafbe3444354cfb0e4a8fdbd3443a63ba867c732/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java#L95C58-L95C58)
 (and an `Integer` value is obviously not an instance of `Long`). However, 
interestingly, the task did not fail and doing some debugging revealed that 
after the offsets are serialized and deserialized by the `JsonConverter` in 
`OffsetsStorageWriter` (in `Worker::modifySourceConnectorOffsets`) and 
`OffsetsStorageReader` respectively, the 
 offsets map that is retrieved by the task on startup through its context 
contains the position `20` as a `Long` value. 
   
   While this particular case is easily handled by simply accepting `Integer` 
values as valid in the `FileStreamSourceConnector::alterOffsets` method, I'm 
thinking we probably need to make some changes so that the offsets map passed 
to source connectors in their `alterOffsets` method is the same as the offsets 
map that connectors / tasks will retrieve via the `OffsetsStorageReader` from 
their context (otherwise, this could lead to some hard to debug issues in other 
connectors implementing the `SourceConnector::alterOffsets` method). The 
easiest way off the top of my head would probably would be to serialize and 
deserialize the offsets map using the `JsonConverter` before invoking 
`SourceConnector::alterOffsets`. WDYT?
   
   Furthermore, just checking whether the offset position is an instance of 
`Long` (Jackson uses a `Long` if the number doesn't fit in an `Integer`) or 
`Integer` in the `FileStreamSourceConnector::alterOffsets` method seems 
sub-optimal because:
   
   - To someone just reading through the `FileStreamSourceConnector` and 
`FileStreamSourceTask` classes, accepting `Integer` 

[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259827626


##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##
@@ -147,4 +152,102 @@ public void testInvalidBatchSize() {
 sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
 assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
 }
+
+@Test
+public void testAlterOffsetsStdin() {
+sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectPartitionKey() {
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+Collections.singletonMap("other_partition_key", FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+
+// null partitions are invalid
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+null,
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+}
+
+@Test
+public void testAlterOffsetsMultiplePartitions() {
+Map, Map> offsets = new HashMap<>();
+offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+assertTrue(connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectOffsetKey() {
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap("other_offset_key", 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsOffsetPositionValues() {

Review Comment:
   Nice!  



##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##
@@ -147,4 +152,102 @@ public void testInvalidBatchSize() {
 sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
 assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
 }
+
+@Test
+public void testAlterOffsetsStdin() {
+sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectPartitionKey() {
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+Collections.singletonMap("other_partition_key", FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+
+// null partitions are invalid
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+null,
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+}
+
+@Test
+public void testAlterOffsetsMultiplePartitions() {
+Map, Map> offsets = new HashMap<>();
+offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+assertTrue(connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectOffsetKey() {
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap("other_offset_key", 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsOffsetPositionValues() {

Review Comment:
   Nice!  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259821609


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Ah, good catch, thanks. I think it might probably be a bit friendlier if we 
update the task class instead to do similar parsing, WDYT? I'm okay either way, 
since the most common use case would be copy pasting the output from `GET 
/offsets` and modifying it in which case users would end up using a number 
rather than a string anyway.



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;

Review Comment:
   Whoops, thanks. I forgot that we're no longer doing the single partition 
offset pair validation 臘 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259821859


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;

Review Comment:
   Whoops, thanks. I forgot that we're no longer doing the single partition 
offset pair validation 臘 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259462328


##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##
@@ -147,4 +151,59 @@ public void testInvalidBatchSize() {
 sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
 assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
 }
+
+@Test
+public void testAlterOffsetsStdin() {
+sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectPartitionKey() {
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+Collections.singletonMap("invalid_partition_key", FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+
+// null partitions are invalid
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+null,
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+}
+
+@Test
+public void testAlterOffsetsMultiplePartitions() {
+Map, Map> offsets = new HashMap<>();
+offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+connector.alterOffsets(sourceProperties, offsets);

Review Comment:
   Ah, good point. Done.



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");

Review Comment:
   Makes sense, done.



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset != null && !offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+}
+
+// Let the task validate the actual value for the offset position on 
startup

Review Comment:
   My initial reasoning was that we probably wouldn't want to open an input 
stream here to read from the file and verify whether the offset position is 
actually valid. But yeah, no reason we can't do basic validations 

[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-08 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1257110479


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,40 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");
+}
+
+// An empty offsets map could indicate that the offsets were reset 
previously or that no offsets have been committed yet (for a reset operation)
+// - we don't need any additional validation for this case.
+if (offsets.size() == 0) {
+return true;
+}
+
+// This connector makes use of a single source partition which 
represents the file that it is configured to read from
+if (offsets.size() > 1) {
+throw new ConnectException("The " + 
FileStreamSourceConnector.class.getSimpleName() + " supports only a single 
source partition / file");

Review Comment:
   Oh wow, that's a really good catch, thanks! One point to note is that in a 
`DELETE /offsets` request for a source connector, users don't specify the 
partitions / offsets (we write `null` offsets for every known partition).
   
   Thinking about this a bit more, we shouldn't fail the request even if there 
are multiple partitions with non-null values. Taking the same example you used, 
if a user calls `GET /offsets` and copies the response body as the request body 
for a `PATCH /offsets` request while only modifying the offset value for file 
`b`, I don't think it makes sense to fail that request. Also, requests that 
don't even include the current source partition are also fairly harmless. I've 
relaxed the validation criteria in the latest patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org