[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1266275006 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,49 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} Review Comment: Hah this seems very similar to what we've been discussing on the MM2 PR and has somewhat comparable flexibility - user friendliness tradeoffs 😅 Allowing users to modify offsets when the connector is currently configured to read from stdin could be useful in order to clean up stale offsets. However, it could be slightly misleading to accept offsets modification requests and return a successful response and message when in actuality the operation was a no-op as far as the connector's task is concerned. I'd also like to note for posterity that it isn't possible to detect past connector configurations or previous offsets in the `SourceConnector::alterOffsets` method. Both the following cases should be uncommon enough that either approach seems acceptable to me - - An unfamiliar user attempts to modify offsets for a file stream source connector configured to read from stdin and expects the connector to re-produce past data. - A user configures a file stream source connector to read from a file, then reconfigures it to read from stdin, and then wants to clean up previous offsets. I think that the first case seems a bit more likely which is why I think the existing approach seems preferable, WDYT? Also, the second case can still be satisfied by temporarily reverting the configuration if the user **_really_** wants to clean up the offsets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1266275006 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,49 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} Review Comment: Hah this seems very similar to what we've been discussing on the MM2 PR and has somewhat comparable flexibility - user friendliness tradeoffs 😅 Allowing users to modify offsets when the connector is currently configured to read from stdin could be useful in order to clean up stale offsets. However, it could be slightly misleading to accept offsets modification requests and return a successful response and message when in actuality the operation was a no-op as far as the connector's task is concerned. I'd also like to note for posterity that it isn't possible to detect past connector configurations or previous offsets in the `SourceConnector::alterOffsets` method. Both the following cases seem uncommon enough that either approach should be acceptable - - An unfamiliar user attempts to modify offsets for a file stream source connector configured to read from stdin and expects the connector to re-produce past data. - A user configures a file stream source connector to read from a file, then reconfigures it to read from stdin, and then wants to clean up previous offsets. ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,49 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} Review Comment: Hah this seems very similar to what we've been discussing on the MM2 PR and has somewhat comparable flexibility - user friendliness tradeoffs 😅 Allowing users to modify offsets when the connector is currently configured to read from stdin could be useful in order to clean up stale offsets. However, it could be slightly misleading to accept offsets modification requests and return a successful response and message when in actuality the operation was a no-op as far as the connector's task is concerned. I'd also like to note for posterity that it isn't possible to detect past connector configurations or previous offsets in the `SourceConnector::alterOffsets` method. Both the following cases seem uncommon enough that either approach should be acceptable - - An unfamiliar user attempts to modify offsets for a file stream source connector configured to read from stdin and expects the connector to re-produce past data. - A user configures a file stream source connector to read from a file, then reconfigures it to read from stdin, and then wants to clean up previous offsets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1261948519 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: > Hmmm... wouldn't that be a pretty serious breaking change if we accidentally switched up how the JSON converter deserializes integer types? Not just for the file source connector, but for plenty of others. Okay, that's fair enough, I've changed the check in `FileStreamSourceConnector::alterOffsets` to mirror the one made in the task at startup for consistency (and avoided making changes in the existing task logic). This does mean that this PR should be merged after https://github.com/apache/kafka/pull/14003 has been merged (assuming that that approach is acceptable). > I don't know if this significantly changes the conversation but it seems subtle and counterintuitive enough to bring up so that we can avoid accidentally breaking connector code that relies on this behavior. Hm yeah, that's definitely another interesting one to bring up - however, I'd contend that that one kinda makes sense since we're passing the `SourceRecord` itself - tasks already deal with `SourceRecord` and their offsets (and associated types) in their regular lifecycle. It would be highly confusing if the `SourceRecord` that they get in `commitRecord` doesn't match the one they dispatched to the framework via `poll`. Of course, ideally, the offsets that they read via `OffsetStorageReader` should also not have any type mismatches compared to the `SourceRecord` ones, but I don't think we'd want to (or safely could) change that at this point. Since the offsets being altered externally would correspond to the ones that the connector / tasks read at startup, I think it makes sense to align the types across invocations to `SourceConnector::alterOffsets` and offsets queried from an `OffsetStorageReader` (and an implicit separate alignment between the `SourceRecord`'s offsets types). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1261948519 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: > Hmmm... wouldn't that be a pretty serious breaking change if we accidentally switched up how the JSON converter deserializes integer types? Not just for the file source connector, but for plenty of others. Okay, that's fair enough, I've changed the check in `FileStreamSourceConnector::alterOffsets` to mirror the one made in the task at startup for consistency (and avoided making changes in the existing task logic). > I don't know if this significantly changes the conversation but it seems subtle and counterintuitive enough to bring up so that we can avoid accidentally breaking connector code that relies on this behavior. Hm yeah, that's definitely another interesting one to bring up - however, I'd contend that that one kinda makes sense since we're passing the `SourceRecord` itself - tasks already deal with `SourceRecord` and their offsets (and associated types) in their regular lifecycle. It would be highly confusing if the `SourceRecord` that they get in `commitRecord` doesn't match the one they dispatched to the framework via `poll`. Of course, ideally, the offsets that they read via `OffsetStorageReader` should also not have any type mismatches compared to the `SourceRecord` ones, but I don't think we'd want to (or safely could) change that at this point. Since the offsets being altered externally would correspond to the ones that the connector / tasks read at startup, I think it makes sense to align the types across invocations to `SourceConnector::alterOffsets` and offsets queried from an `OffsetStorageReader` (and an implicit separate alignment between the `SourceRecord`'s offsets types). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1261515978 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: Since the type alignment issue seemed like a broader one (i.e. not scoped to the file connectors being touched here), I've created a separate [ticket](https://issues.apache.org/jira/browse/KAFKA-15182) and [PR](https://github.com/apache/kafka/pull/14003) for it. > it doesn't seem like a great endorsement of our API if we have to implement workarounds in the file connectors, which are the first example of the connector API that many developers see. I'd argue that it isn't really a workaround and that the current check itself is bad. If the (de)serialization happened to use `Integer` for values that fit in a 32 bit signed type (which would be perfectly valid and is exactly what we see currently before the values are passed through the converter), the current check in the `FileStreamSourceTask` would cause it to bail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1260946148 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: I found something interesting when experimenting with this. When passing a request body to the `PATCH /offsets` endpoint like: ``` { "offsets": [ { "partition": { "filename": "/path/to/filename" }, "offset": { "position": 20 } } ] } ``` the position `20` is deserialized to an `Integer` by Jackson (the JSON library we use in the REST layer for Connect) which seems fine because JSON doesn't have separate types for 32 bit and 64 bit numbers. So, the offsets map that is passed to `FileStreamSourceConnector::alterOffsets` by the runtime also contains `20` as an `Integer` value. I initially thought that this would cause the `FileStreamSourceTask` to fail at startup because it uses an `instanceof Long` check [here](https://github.com/apache/kafka/blob/aafbe3444354cfb0e4a8fdbd3443a63ba867c732/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java#L95C58-L95C58) (and an `Integer` value is obviously not an instance of `Long`). However, interestingly, the task did not fail and doing some debugging revealed that after the offsets are serialized and deserialized by the `JsonConverter` in `OffsetsStorageWriter` (in `Worker::modifySourceConnectorOffsets`) and `OffsetsStorageReader` respectively, the offsets map that is retrieved by the task on startup through its context contains the position `20` as a `Long` value. While this particular case is easily handled by simply accepting `Integer` values as valid in the `FileStreamSourceConnector::alterOffsets` method, I'm thinking we probably need to make some changes so that the offsets map passed to source connectors in their `alterOffsets` method is the same as the offsets map that connectors / tasks will retrieve via the `OffsetsStorageReader` from their context (otherwise, this could lead to some hard to debug issues in other connectors implementing the `SourceConnector::alterOffsets` method). The easiest way off the top of my head would probably would be to serialize and deserialize the offsets map using the `JsonConverter` before invoking `SourceConnector::alterOffsets`. WDYT? Furthermore, just checking whether the offset position is an instance of `Long` (Jackson uses a `Long` if the number doesn't fit in an `Integer`) or `Integer` in the `FileStreamSourceConnector::alterOffsets` method seems sub-optimal because: - To someone just reading through the `FileStreamSourceConnector` and `FileStreamSourceTask` classes, accepting `Integer`
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259827626 ## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ## @@ -147,4 +152,102 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + +@Test +public void testAlterOffsetsStdin() { +sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectPartitionKey() { +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +Collections.singletonMap("other_partition_key", FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +))); + +// null partitions are invalid +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +null, +Collections.singletonMap(POSITION_FIELD, 0) +))); +} + +@Test +public void testAlterOffsetsMultiplePartitions() { +Map, Map> offsets = new HashMap<>(); +offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); +offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); +assertTrue(connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectOffsetKey() { +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap("other_offset_key", 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsOffsetPositionValues() { Review Comment: Nice! 🚀 ## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ## @@ -147,4 +152,102 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + +@Test +public void testAlterOffsetsStdin() { +sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectPartitionKey() { +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +Collections.singletonMap("other_partition_key", FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +))); + +// null partitions are invalid +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +null, +Collections.singletonMap(POSITION_FIELD, 0) +))); +} + +@Test +public void testAlterOffsetsMultiplePartitions() { +Map, Map> offsets = new HashMap<>(); +offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); +offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); +assertTrue(connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectOffsetKey() { +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap("other_offset_key", 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsOffsetPositionValues() { Review Comment: Nice! 🚀 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubs
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259821609 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: Ah, good catch, thanks. I think it might probably be a bit friendlier if we update the task class instead to do similar parsing, WDYT? I'm okay either way, since the most common use case would be copy pasting the output from `GET /offsets` and modifying it in which case users would end up using a number rather than a string anyway. ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; Review Comment: Whoops, thanks. I forgot that we're no longer doing the single partition offset pair validation 🤦 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259821859 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; Review Comment: Whoops, thanks. I forgot that we're no longer doing the single partition offset pair validation 🤦 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259462328 ## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ## @@ -147,4 +151,59 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + +@Test +public void testAlterOffsetsStdin() { +sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectPartitionKey() { +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +Collections.singletonMap("invalid_partition_key", FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +))); + +// null partitions are invalid +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +null, +Collections.singletonMap(POSITION_FIELD, 0) +))); +} + +@Test +public void testAlterOffsetsMultiplePartitions() { +Map, Map> offsets = new HashMap<>(); +offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); +offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); +connector.alterOffsets(sourceProperties, offsets); Review Comment: Ah, good point. Done. ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); Review Comment: Makes sense, done. ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset != null && !offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} +} + +// Let the task validate the actual value for the offset position on startup Review Comment: My initial reasoning was that we probably wouldn't want to open an input stream here to read from the file and verify whether the offset position is actually valid. But yeah, no reason we can't do basic validations like
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1257110479 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,40 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); +} + +// An empty offsets map could indicate that the offsets were reset previously or that no offsets have been committed yet (for a reset operation) +// - we don't need any additional validation for this case. +if (offsets.size() == 0) { +return true; +} + +// This connector makes use of a single source partition which represents the file that it is configured to read from +if (offsets.size() > 1) { +throw new ConnectException("The " + FileStreamSourceConnector.class.getSimpleName() + " supports only a single source partition / file"); Review Comment: Oh wow, that's a really good catch, thanks! One point to note is that in a `DELETE /offsets` request for a source connector, users don't specify the partitions / offsets (we write `null` offsets for every known partition). Thinking about this a bit more, we shouldn't fail the request even if there are multiple partitions with non-null values. Taking the same example you used, if a user calls `GET /offsets` and copies the response body as the request body for a `PATCH /offsets` request while only modifying the offset value for file `b`, I don't think it makes sense to fail that request. Also, requests that don't even include the current source partition are also fairly harmless. I've relaxed the validation criteria in the latest patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org