C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1261646923
##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String,
String> props) {
: ExactlyOnceSupport.UNSUPPORTED;
}
+ @Override
+ public boolean alterOffsets(Map<String, String> connectorConfig,
Map<Map<String, ?>, Map<String, ?>> offsets) {
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF,
connectorConfig);
+ String filename = config.getString(FILE_CONFIG);
+ if (filename == null || filename.isEmpty()) {
+ // If the 'file' configuration is unspecified, stdin is used and
no offsets are tracked
+ throw new ConnectException("Offsets cannot be modified if the '" +
FILE_CONFIG + "' configuration is unspecified. " +
+ "This is because stdin is used for input and offsets are
not tracked.");
+ }
+
+ // This connector makes use of a single source partition at a time
which represents the file that it is configured to read from.
+ // However, there could also be source partitions from previous
configurations of the connector.
+ for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset :
offsets.entrySet()) {
+ Map<String, ?> partition = partitionOffset.getKey();
+ if (partition == null) {
+ throw new ConnectException("Partition objects cannot be null");
+ }
+
+ if (!partition.containsKey(FILENAME_FIELD)) {
+ throw new ConnectException("Partition objects should contain
the key '" + FILENAME_FIELD + "'");
+ }
+
+ Map<String, ?> offset = partitionOffset.getValue();
+ // null offsets are allowed and represent a deletion of offsets
for a partition
+ if (offset == null) {
+ return true;
+ }
+
+ if (!offset.containsKey(POSITION_FIELD)) {
+ throw new ConnectException("Offset objects should either be
null or contain the key '" + POSITION_FIELD + "'");
+ }
+
+ // The 'position' in the offset represents the position in the
file's byte stream and should be a non-negative long value
+ try {
+ long offsetPosition =
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));
Review Comment:
Hmmm... wouldn't that be a pretty serious breaking change if we accidentally
switched up how the JSON converter deserializes integer types? Not just for the
file source connector, but for plenty of others.
It feels like it might be a better use of our time to make note of this
possibility and ensure that we have sufficient unit testing in place to prevent
that kind of regression (I suspect we already do but haven't verified this yet).
Of course, because things aren't interesting enough already--it turns out
that there's actually two different scenarios in which tasks observe offsets
for their connector. The first, which we're all familiar with, is when they
query them using an
[OffsetStorageReader](https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java),
which in distributed mode reflects the contents of the offsets topic. The
second is when
[SourceTask::commitRecord](https://kafka.apache.org/35/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata))
is invoked, which carries with it the just-ack'd `SourceRecord` instance
originally provided by the task, including the original in-memory source
partition and source offset, which may use types that get lost when written to
and read back from the offsets topic.
I don't know if this significantly changes the conversation but it seems
subtle and counterintuitive enough to bring up so that we can avoid
accidentally breaking connector code that relies on this behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]