yashmayya commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1203576413
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { ); } + @Override + public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) { + log.trace("Submitting alter offsets request for connector '{}'", connName); + + addRequest(() -> { + refreshConfigSnapshot(workerSyncTimeoutMs); + if (!alterConnectorOffsetsChecks(connName, callback)) { + return null; + } + // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run + // a zombie fencing request + if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) { + log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName); + getFenceZombieSourceTasksCallable(connName, (error, ignored) -> { + if (error != null) { + log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error); + callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", + error), null); + } else { + log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName); + // We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest + // since it is being run in a separate herder request and the conditions could have changed since the + // previous check + addRequest(getAlterConnectorOffsetsCallable(connName, offsets, callback), forwardErrorCallback(callback)); + } + }).call(); + } else { + getAlterConnectorOffsetsCallable(connName, offsets, callback).call(); Review Comment: Wow thanks, good catch! I've removed the `getAlterConnectorOffsetsCallable` method and refactored the zombie fencing one to be synchronous and named it `doFenceZombieSourceTasks` (it definitely sounds a lot better 🙂). ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1235,6 +1244,246 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } + /** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions (either source partitions for source connectors, or Kafka topic + * partitions for sink connectors) to offsets that need to be written; may not be null or empty + * @param cb callback to invoke upon completion + */ + public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) { + + if (offsets == null || offsets.isEmpty()) { + throw new ConnectException("The offsets to be altered may not be null or empty"); + } + + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Altering consumer group offsets for sink connector: {}", connName); + alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } else { + log.debug("Altering offsets for source connector: {}", connName); + alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } + } + } + + /** + * Alter a sink connector's consumer group offsets. + * <p> + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be written; may not be null or empty + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ + void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { + executor.submit(plugins.withClassLoader(connectorLoader, () -> { + try { + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(offsets); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + + SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); + Class<? extends Connector> sinkConnectorClass = connector.getClass(); + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + sinkConnectorConfig, + sinkConnectorClass, + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SINK); + + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, sinkConnectorConfig, + sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + + Admin admin = adminFactory.apply(adminConfig); + + try { + KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null); + + Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + + if (!offsetsToAlter.isEmpty()) { + log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", + connName, offsetsToAlter); + AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, + alterConsumerGroupOffsetsOptions); + + adminFuture = alterConsumerGroupOffsetsResult.all(); + } + + Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + if (!partitionsToReset.isEmpty()) { + log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", + connName, partitionsToReset); + DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, + deleteConsumerGroupOffsetsOptions); + + adminFuture = KafkaFuture.allOf(adminFuture, deleteConsumerGroupOffsetsResult.all()); + } + + adminFuture.whenComplete((ignored, error) -> { + if (error != null) { + // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client + // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped + // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to + // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException + if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), + null); + } else { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); + } + } else { + completeAlterOffsetsCallback(alterOffsetsResult, cb); + } + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); Review Comment: Hm I don't really see how the callback logic could bail out in its current shape but I guess using a chained `whenComplete` for relinquishing resources is a better pattern overall. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } + /** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ + public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Altering consumer group offsets for sink connector: {}", connName); + alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } else { + log.debug("Altering offsets for source connector: {}", connName); + alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } + } + } + + /** + * Alter a sink connector's consumer group offsets. + * <p> + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ + void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { + executor.submit(plugins.withClassLoader(connectorLoader, () -> { + try { + Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + + Class<? extends Connector> sinkConnectorClass = connector.getClass(); + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + sinkConnectorClass, + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SINK); + + SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, sinkConnectorConfig, + sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + + Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null); + + Admin admin = adminFactory.apply(adminConfig); + + try { + if (!offsetsToAlter.isEmpty()) { + log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.", + connName, offsetsToAlter); + AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, + alterConsumerGroupOffsetsOptions); + + adminFuture = alterConsumerGroupOffsetsResult.all(); + } + + adminFuture.whenComplete((ignored, error) -> { + if (error != null) { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + // When a consumer group is non-empty, only group members can commit offsets. The above attempt to alter offsets via the admin + // client will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped + // completely or if the connector is resumed while the alter offsets request is being processed) + if (error instanceof UnknownMemberIdException) { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), + null); + } else { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for " + + "connector " + connName, error), + null); + } + } else if (!partitionsToReset.isEmpty()) { + log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", + connName, partitionsToReset); + DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, + deleteConsumerGroupOffsetsOptions); + deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + if (error2 != null) { + // The attempt to delete offsets for certain topic partitions via the admin client will result in a + // GroupSubscribedToTopicException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped completely + // or if the connector is resumed while the alter offsets request is being processed). + if (error2 instanceof GroupSubscribedToTopicException) { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), + null); + } else { + cb.onCompletion(new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector " + + connName, error2), + null); + } + } else { + completeAlterOffsetsCallback(alterOffsetsResult, cb); + } + }); + } else { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + completeAlterOffsetsCallback(alterOffsetsResult, cb); + } + }); + } catch (Throwable t) { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + throw t; + } + } catch (Throwable t) { + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null); + } + })); + } + + /** + * Alter a source connector's offsets. + * + * @param connName the name of the source connector whose offsets are to be altered + * @param connector an instance of the source connector + * @param connectorConfig the source connector's configuration + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ + private void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable()); + Map<String, Object> producerProps = config.exactlyOnceSourceEnabled() + ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig, + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId) + : baseProducerConfigs(connName, "connector-offset-producer-" + connName, config, sourceConfig, + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = config.exactlyOnceSourceEnabled() + ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, producer) + : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, producer); + + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter); + alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb); + } + + // Visible for testing + void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore, + KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter, + ClassLoader connectorLoader, Callback<Message> cb) { + executor.submit(plugins.withClassLoader(connectorLoader, () -> { + try { + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + + offsetStore.configure(config); + // This reads to the end of the offsets topic and can be a potentially time-consuming operation + offsetStore.start(); + + // The alterSourceConnectorOffsets method should only be called after all the connector's tasks have been stopped, and it's + // safe to write offsets via an offset writer + offsets.forEach(offsetWriter::offset); + + // We can call begin flush without a timeout because this newly created single-purpose offset writer can't do concurrent + // offset writes. We can also ignore the return value since it returns false if and only if there is no data to be flushed, + // and we've just put some data in the previous statement + offsetWriter.beginFlush(); + + try { + if (config.exactlyOnceSourceEnabled()) { + producer.initTransactions(); + producer.beginTransaction(); + } + log.debug("Committing the following partition offsets for source connector {}: {}", connName, offsets); + FutureCallback<Void> offsetWriterCallback = new FutureCallback<>(); + offsetWriter.doFlush(offsetWriterCallback); + if (config.exactlyOnceSourceEnabled()) { + producer.commitTransaction(); + } + offsetWriterCallback.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); Review Comment: Ahh yes, that totally makes sense. Thanks, I hadn't considered this tombstones edge case. I agree that we should attempt to fix it for both the scenarios (REST API offset alterations and task offset commits) together, since designing a specific solution here for the REST API, then fixing it in the `ConnectorOffsetBackingStore` and then reverting the REST API specific solution doesn't seem ideal. However, I do agree that it'd be good if we could fix [KAFKA-15018](https://issues.apache.org/jira/browse/KAFKA-15018) in the same release that this patch goes out (hopefully `3.6`). I'll take a look and I might be able to pick that one up as well! ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } + /** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ + public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Altering consumer group offsets for sink connector: {}", connName); + alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } else { + log.debug("Altering offsets for source connector: {}", connName); + alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } + } + } + + /** + * Alter a sink connector's consumer group offsets. + * <p> + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ + void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { + executor.submit(plugins.withClassLoader(connectorLoader, () -> { + try { + Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + + Class<? extends Connector> sinkConnectorClass = connector.getClass(); + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + sinkConnectorClass, + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SINK); + + SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, sinkConnectorConfig, + sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + + Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null); + + Admin admin = adminFactory.apply(adminConfig); + + try { + if (!offsetsToAlter.isEmpty()) { + log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.", + connName, offsetsToAlter); + AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, + alterConsumerGroupOffsetsOptions); + + adminFuture = alterConsumerGroupOffsetsResult.all(); + } + + adminFuture.whenComplete((ignored, error) -> { + if (error != null) { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + // When a consumer group is non-empty, only group members can commit offsets. The above attempt to alter offsets via the admin + // client will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped + // completely or if the connector is resumed while the alter offsets request is being processed) + if (error instanceof UnknownMemberIdException) { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), + null); + } else { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for " + + "connector " + connName, error), + null); + } + } else if (!partitionsToReset.isEmpty()) { + log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", + connName, partitionsToReset); + DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, + deleteConsumerGroupOffsetsOptions); + deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + if (error2 != null) { + // The attempt to delete offsets for certain topic partitions via the admin client will result in a + // GroupSubscribedToTopicException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped completely + // or if the connector is resumed while the alter offsets request is being processed). + if (error2 instanceof GroupSubscribedToTopicException) { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), + null); + } else { + cb.onCompletion(new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector " + + connName, error2), + null); + } + } else { + completeAlterOffsetsCallback(alterOffsetsResult, cb); + } + }); + } else { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + completeAlterOffsetsCallback(alterOffsetsResult, cb); + } + }); + } catch (Throwable t) { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + throw t; + } + } catch (Throwable t) { + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null); + } + })); + } + + /** + * Alter a source connector's offsets. + * + * @param connName the name of the source connector whose offsets are to be altered + * @param connector an instance of the source connector + * @param connectorConfig the source connector's configuration + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ + private void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable()); + Map<String, Object> producerProps = config.exactlyOnceSourceEnabled() + ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig, + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId) + : baseProducerConfigs(connName, "connector-offset-producer-" + connName, config, sourceConfig, + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = config.exactlyOnceSourceEnabled() + ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, producer) + : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, producer); + + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter); + alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb); + } + + // Visible for testing + void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore, + KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter, + ClassLoader connectorLoader, Callback<Message> cb) { + executor.submit(plugins.withClassLoader(connectorLoader, () -> { + try { + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + + offsetStore.configure(config); + // This reads to the end of the offsets topic and can be a potentially time-consuming operation + offsetStore.start(); Review Comment: I'm not sure I follow? Everything after the `start()` invocation is wrapped in a try-finally which ensures that the offset store is stopped. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org