C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202684769
########## connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java: ########## @@ -52,4 +53,82 @@ public static ConnectorOffsets consumerGroupOffsetsToConnectorOffsets(Map<TopicP return new ConnectorOffsets(connectorOffsets); } + + /** + * Validate that the provided partitions (keys in the {@code partitionOffsets} map) look like: + * <pre> + * { + * "kafka_topic": "topic" + * "kafka_partition": 3 + * } + * </pre> + * + * and that the provided offsets (values in the {@code partitionOffsets} map) look like: + * <pre> + * { + * "kafka_offset": 1000 + * } + * </pre> + * + * This method then parses them into a mapping from {@link TopicPartition}s to their corresponding {@link Long} + * valued offsets. + * + * @param partitionOffsets the partitions to offset map that needs to be validated and parsed. + * @return the parsed mapping from {@link TopicPartition} to its corresponding {@link Long} valued offset. + * + * @throws BadRequestException if the provided offsets aren't in the expected format + */ + public static Map<TopicPartition, Long> validateAndParseSinkConnectorOffsets(Map<Map<String, ?>, Map<String, ?>> partitionOffsets) { Review Comment: Well done, looks great 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org