yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261948519
##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String,
String> props) {
: ExactlyOnceSupport.UNSUPPORTED;
}
+ @Override
+ public boolean alterOffsets(Map<String, String> connectorConfig,
Map<Map<String, ?>, Map<String, ?>> offsets) {
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF,
connectorConfig);
+ String filename = config.getString(FILE_CONFIG);
+ if (filename == null || filename.isEmpty()) {
+ // If the 'file' configuration is unspecified, stdin is used and
no offsets are tracked
+ throw new ConnectException("Offsets cannot be modified if the '" +
FILE_CONFIG + "' configuration is unspecified. " +
+ "This is because stdin is used for input and offsets are
not tracked.");
+ }
+
+ // This connector makes use of a single source partition at a time
which represents the file that it is configured to read from.
+ // However, there could also be source partitions from previous
configurations of the connector.
+ for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset :
offsets.entrySet()) {
+ Map<String, ?> partition = partitionOffset.getKey();
+ if (partition == null) {
+ throw new ConnectException("Partition objects cannot be null");
+ }
+
+ if (!partition.containsKey(FILENAME_FIELD)) {
+ throw new ConnectException("Partition objects should contain
the key '" + FILENAME_FIELD + "'");
+ }
+
+ Map<String, ?> offset = partitionOffset.getValue();
+ // null offsets are allowed and represent a deletion of offsets
for a partition
+ if (offset == null) {
+ return true;
+ }
+
+ if (!offset.containsKey(POSITION_FIELD)) {
+ throw new ConnectException("Offset objects should either be
null or contain the key '" + POSITION_FIELD + "'");
+ }
+
+ // The 'position' in the offset represents the position in the
file's byte stream and should be a non-negative long value
+ try {
+ long offsetPosition =
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));
Review Comment:
> Hmmm... wouldn't that be a pretty serious breaking change if we
accidentally switched up how the JSON converter deserializes integer types? Not
just for the file source connector, but for plenty of others.
Okay, that's fair enough, I've changed the check in
`FileStreamSourceConnector::alterOffsets` to mirror the one made in the task at
startup for consistency (and avoided making changes in the existing task
logic). This does mean that this PR should be merged after
https://github.com/apache/kafka/pull/14003 has been merged (assuming that that
approach is acceptable).
> I don't know if this significantly changes the conversation but it seems
subtle and counterintuitive enough to bring up so that we can avoid
accidentally breaking connector code that relies on this behavior.
Hm yeah, that's definitely another interesting one to bring up - however,
I'd contend that that one kinda makes sense since we're passing the
`SourceRecord` itself - tasks already deal with `SourceRecord` and their
offsets (and associated types) in their regular lifecycle. It would be highly
confusing if the `SourceRecord` that they get in `commitRecord` doesn't match
the one they dispatched to the framework via `poll`. Of course, ideally, the
offsets that they read via `OffsetStorageReader` should also not have any type
mismatches compared to the `SourceRecord` ones, but I don't think we'd want to
(or safely could) change that at this point.
Since the offsets being altered externally would correspond to the ones that
the connector / tasks read at startup, I think it makes sense to align the
types across invocations to `SourceConnector::alterOffsets` and offsets queried
from an `OffsetStorageReader` (and an implicit separate alignment between the
`SourceRecord`'s offsets types).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]