yashmayya commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1148315133
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1139,105 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @return the connector's offsets + */ + public ConnectorOffsets connectorOffsets(String connName, Map<String, String> connectorConfig) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + } Review Comment: I've made the changes because it looks like all other calls to retrieve client configs for a connector or calls to `ConnectUtils.isSinkConnector` are using connector specific class loaders. However, I'm not sure I entirely follow why that is required since there don't seem to be any direct interactions with connectors in there? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1139,105 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @return the connector's offsets + */ + public ConnectorOffsets connectorOffsets(String connName, Map<String, String> connectorConfig) { Review Comment: Thanks, that's a great suggestion! I've done the required refactoring. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -866,4 +882,20 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { } } + @Override + public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { + log.debug("Submitting offset fetch request for connector: {}", connName); + connectorExecutor.submit(() -> { + try { + if (!configState.contains(connName)) { Review Comment: Oh wow, I somehow completely missed that 🤦 I've refactored this to use the config backing store (and snapshotting it to retrieve the configs) ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffset.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; +import java.util.Objects; + +/** + * Represents a single {partition, offset} pair for either a sink connector or a source connector. For source connectors, + * the partition and offset structures are defined by the connector implementations themselves. For a sink connector, + * where offsets represent the underlying Kafka consumer group offsets, this would look something like: + * <pre> + * { + * "partition": { + * "kafka_topic": "topic" + * "kafka_partition": 3 + * }, + * "offset": { + * "kafka_offset": 1000 + * } + * } + * </pre> + */ +public class ConnectorOffset { + public static final String KAFKA_TOPIC_KEY = "kafka_topic"; + public static final String KAFKA_PARTITION_KEY = "kafka_partition"; + public static final String KAFKA_OFFSET_KEY = "kafka_offset"; Review Comment: Fair point, I've moved these to `SinkUtils` rather than adding a sink specific static factory method here since it does seem like a better idea to keep this class source / sink agnostic. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java: ########## @@ -78,6 +85,27 @@ private void load() { ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null; data.put(key, value); + if (key != null) { + try { + // The topic parameter is irrelevant for the JsonConverter which is the internal converter used by + // Connect workers. + List<Object> keyValue = (List<Object>) keyConverter.toConnectData("", key.array()).value(); + // The key should always be of the form [connectorName, partition] where connectorName is a + // string value and partition is a Map<String, Object> + String connectorName = (String) keyValue.get(0); + Map<String, Object> partition = (Map<String, Object>) keyValue.get(1); + if (!connectorPartitions.containsKey(connectorName)) { + connectorPartitions.put(connectorName, new HashSet<>()); + } + if (value == null) { + connectorPartitions.get(connectorName).remove(partition); Review Comment: I did think about this one while writing it originally but since the main offset tracking `data` map here (and in the `KafkaOffsetBackingStore` as well) also grows infinitely in the same way, I figured that this is something that can looked at later (if we do decide that it's something that needs to be solved). Like you pointed out, it's been this way since the beginning and there haven't really been any practical concerns so far. We'd discussed something very similar previously for the `ConfigBackingStore` [here](https://github.com/apache/kafka/pull/12490). ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java: ########## @@ -53,9 +53,30 @@ public void requestTimeoutMs(long requestTimeoutMs) { } /** - * Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the - * request to the leader. - */ + * Wait for a {@link FutureCallback} to complete and return the result if successful. + * @param cb the future callback to wait for + * @return the future callback's result if successful + * @param <T> the future's result type + * @throws Throwable if the future callback isn't successful + */ + public <T> T completeRequest(FutureCallback<T> cb) throws Throwable { + try { + return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } catch (TimeoutException e) { + // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server + // error is the best option + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out"); + } catch (InterruptedException e) { + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted"); + } + } Review Comment: Hm, I'd argue that not accounting for the ease of implementation of the unlikely hypothetical scenario where we need to change the logic for requests using this method to start doing forwarding is better than calling `HerderRequestHandler::completeOrForwardRequest` for requests that don't currently involve any request forwarding as it is slightly misleading and also requires passing a bunch of irrelevant parameters. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ########## @@ -343,12 +336,38 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback return producerCallback; } + @Override + public Set<Map<String, Object>> connectorPartitions(String connectorName) { + return connectorPartitions.getOrDefault(connectorName, Collections.emptySet()); + } + + @SuppressWarnings("unchecked") protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = (error, record) -> { if (error != null) { log.error("Failed to read from the offsets topic", error); return; } + if (record.key() != null) { + try { + // The key should always be a list of the form [connectorName, partition] where connectorName is a + // string value and partition is a Map<String, Object> + List<Object> keyValue = (List<Object>) keyConverter.toConnectData(topic, record.key()).value(); + String connectorName = (String) keyValue.get(0); + Map<String, Object> partition = (Map<String, Object>) keyValue.get(1); Review Comment: Makes sense, done. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1139,105 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @return the connector's offsets + */ + public ConnectorOffsets connectorOffsets(String connName, Map<String, String> connectorConfig) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + } + + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + return sinkConnectorOffsets(connName, connector, connectorConfig); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + return sourceConnectorOffsets(connName, connector, connectorConfig); + } + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @return the consumer group offsets for the sink connector + */ + private ConnectorOffsets sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig) { + return sinkConnectorOffsets(connName, connector, connectorConfig, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + ConnectorOffsets sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); + try { + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId); + try { + // Not using a timeout for the Future::get here because each offset get request is handled in its own thread in AbstractHerder + // and the REST API request timeout in HerderRequestHandler will ensure that the user request doesn't hang indefinitely Review Comment: Ah yes, that makes sense. Thanks, done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org