[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1205535710 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } +@Test +public void testAlterOffsetsConnectorNotFound() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +PowerMock.expectLastCall(); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +herder.tick(); +FutureCallback callback = new FutureCallback<>(); +herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); +herder.tick(); +ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); +assertTrue(e.getCause() instanceof NotFoundException); + +PowerMock.verifyAll(); +} + +@Test +public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +PowerMock.expectLastCall(); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +herder.tick(); +FutureCallback callback = new FutureCallback<>(); +herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); +herder.tick(); +ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); +assertTrue(e.getCause() instanceof BadRequestException); + +PowerMock.verifyAll(); +} + +@Test +public void testAlterOffsetsNotLeader() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); +expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +herder.tick(); +FutureCallback callback = new FutureCallback<>(); +herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); +herder.tick(); +ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); +assertTrue(e.getCause() instanceof NotLeaderException); + +PowerMock.verifyAll(); +} + +@Test +public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +Map, Map> offsets = new HashMap<>(); +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +PowerMock.expectLastCall(); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); +
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1205533269 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } +@Test +public void testAlterOffsetsConnectorNotFound() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +PowerMock.expectLastCall(); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +herder.tick(); +FutureCallback callback = new FutureCallback<>(); +herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); +herder.tick(); +ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); +assertTrue(e.getCause() instanceof NotFoundException); + +PowerMock.verifyAll(); +} + +@Test +public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +PowerMock.expectLastCall(); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +herder.tick(); +FutureCallback callback = new FutureCallback<>(); +herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); +herder.tick(); +ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); +assertTrue(e.getCause() instanceof BadRequestException); + +PowerMock.verifyAll(); +} + +@Test +public void testAlterOffsetsNotLeader() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); +expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); Review Comment: Huh, TIL! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1204681794 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } +@Test +public void testAlterOffsetsConnectorNotFound() throws Exception { Review Comment: Nit: it's a little strange that we're using "not found" in the name here but elsewhere in this and the `StandaloneHerderTest` suite we use "unknown" ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsetsTest.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest.entities; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class ConnectorOffsetsTest { + +@Test +public void testConnectorOffsetsToMap() { +// Using arbitrary partition and offset formats here to demonstrate that source connector offsets don't +// follow a standard pattern +Map partition1 = new HashMap<>(); +partition1.put("partitionKey1", "partitionValue"); +partition1.put("k", 123); +Map offset1 = new HashMap<>(); +offset1.put("offset", 3.14); Review Comment: Yum! ## connect/runtime/src/test/java/org/apache/kafka/connect/util/SinkUtilsTest.java: ## @@ -46,4 +52,124 @@ public void testConsumerGroupOffsetsToConnectorOffsets() { expectedPartition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); assertEquals(expectedPartition, connectorOffsets.offsets().get(0).partition()); } + +@Test +public void testValidateAndParseEmptyPartitionOffsetMap() { +// expect no exception to be thrown +Map parsedOffsets = SinkUtils.parseSinkConnectorOffsets(new HashMap<>()); +assertTrue(parsedOffsets.isEmpty()); +} + +@Test +public void testValidateAndParseInvalidPartition() { +Map partition = new HashMap<>(); +partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic"); +Map offset = new HashMap<>(); +offset.put(SinkUtils.KAFKA_OFFSET_KEY, 100); +Map, Map> partitionOffsets = new HashMap<>(); +partitionOffsets.put(partition, offset); + +// missing partition key +ConnectException e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); +assertThat(e.getMessage(), containsString("The partition for a sink connector offset must contain the keys 'kafka_topic' and 'kafka_partition'")); + +partition.put(SinkUtils.KAFKA_PARTITION_KEY, "not a number"); +// bad partition key +e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); +assertThat(e.getMessage(), containsString("Failed to parse the following Kafka partition value in the provided offsets: 'not a number'")); + +partition.remove(SinkUtils.KAFKA_TOPIC_KEY); +partition.put(SinkUtils.KAFKA_PARTITION_KEY, "5"); +// missing topic key +e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); +assertThat(e.getMessage(), containsString("The partition for a sink connector offset must contain the keys 'kafka_topic' and 'kafka_partition'")); +} + +@Test +public void testValidateAndParseInvalidOffset() { +Map partition = new HashMap<>(); +partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic"); +partition.put(SinkUtils.KAFKA_PARTITION_KEY, 10); +Map offset = new HashMap<>(); +Map, Map> partitionOffsets = new HashMap<>(); +partitionOffsets.put(partition, offset); + +// missing offset key +ConnectException e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); +assertThat(e.getMessage(), containsString("The offset for a sink connector should either
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1204612735 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } +/** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ +public void alterConnectorOffsets(String connName, Map connectorConfig, + Map, Map> offsets, Callback cb) { +String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); +Connector connector; + +try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { +connector = plugins.newConnector(connectorClassOrAlias); +if (ConnectUtils.isSinkConnector(connector)) { +log.debug("Altering consumer group offsets for sink connector: {}", connName); +alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} else { +log.debug("Altering offsets for source connector: {}", connName); +alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} +} +} + +/** + * Alter a sink connector's consumer group offsets. + * + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ +void alterSinkConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { +executor.submit(plugins.withClassLoader(connectorLoader, () -> { +try { +Map parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} + +Class sinkConnectorClass = connector.getClass(); +Map adminConfig = adminConfigs( +connName, +"connector-worker-adminclient-" + connName, +config, +new SinkConnectorConfig(plugins, connectorConfig), +sinkConnectorClass, +connectorClientConfigOverridePolicy, +kafkaClusterId, +ConnectorType.SINK); + +SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); +String groupId = (String) baseConsumerConfigs( +connName, "connector-consumer-", config, sinkConnectorConfig, +sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + +Map offsetsToAlter = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() != null) +.map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue( +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Set partitionsToReset = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() == null) +.map(Map.Entry::getKey) +.collect(Collectors.toSet()); + +KafkaFuture adminFuture = KafkaFuture.completedFuture(null); + +Admin admin = adminFactory.apply(adminConfig); + +try { +
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1204612162 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } +/** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ +public void alterConnectorOffsets(String connName, Map connectorConfig, + Map, Map> offsets, Callback cb) { +String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); +Connector connector; + +try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { +connector = plugins.newConnector(connectorClassOrAlias); +if (ConnectUtils.isSinkConnector(connector)) { +log.debug("Altering consumer group offsets for sink connector: {}", connName); +alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} else { +log.debug("Altering offsets for source connector: {}", connName); +alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} +} +} + +/** + * Alter a sink connector's consumer group offsets. + * + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ +void alterSinkConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { +executor.submit(plugins.withClassLoader(connectorLoader, () -> { +try { +Map parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} + +Class sinkConnectorClass = connector.getClass(); +Map adminConfig = adminConfigs( +connName, +"connector-worker-adminclient-" + connName, +config, +new SinkConnectorConfig(plugins, connectorConfig), +sinkConnectorClass, +connectorClientConfigOverridePolicy, +kafkaClusterId, +ConnectorType.SINK); + +SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); +String groupId = (String) baseConsumerConfigs( +connName, "connector-consumer-", config, sinkConnectorConfig, +sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + +Map offsetsToAlter = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() != null) +.map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue( +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Set partitionsToReset = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() == null) +.map(Map.Entry::getKey) +.collect(Collectors.toSet()); + +KafkaFuture adminFuture = KafkaFuture.completedFuture(null); + +Admin admin = adminFactory.apply(adminConfig); + +try { +
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1204604996 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback cb) { ); } +@Override +public void alterConnectorOffsets(String connName, Map, Map> offsets, Callback callback) { +log.trace("Submitting alter offsets request for connector '{}'", connName); + +addRequest(() -> { +refreshConfigSnapshot(workerSyncTimeoutMs); +if (!alterConnectorOffsetsChecks(connName, callback)) { +return null; +} +// At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run +// a zombie fencing request +if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) { +log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName); +getFenceZombieSourceTasksCallable(connName, (error, ignored) -> { +if (error != null) { +log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error); +callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", +error), null); +} else { +log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName); +// We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest +// since it is being run in a separate herder request and the conditions could have changed since the +// previous check +addRequest(getAlterConnectorOffsetsCallable(connName, offsets, callback), forwardErrorCallback(callback)); +} +}).call(); +} else { +getAlterConnectorOffsetsCallable(connName, offsets, callback).call(); Review Comment: Nice, LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202738629 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } +/** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ +public void alterConnectorOffsets(String connName, Map connectorConfig, + Map, Map> offsets, Callback cb) { +String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); +Connector connector; + +try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { +connector = plugins.newConnector(connectorClassOrAlias); +if (ConnectUtils.isSinkConnector(connector)) { +log.debug("Altering consumer group offsets for sink connector: {}", connName); +alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} else { +log.debug("Altering offsets for source connector: {}", connName); +alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} +} +} + +/** + * Alter a sink connector's consumer group offsets. + * + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ +void alterSinkConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { +executor.submit(plugins.withClassLoader(connectorLoader, () -> { +try { +Map parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} + +Class sinkConnectorClass = connector.getClass(); +Map adminConfig = adminConfigs( +connName, +"connector-worker-adminclient-" + connName, +config, +new SinkConnectorConfig(plugins, connectorConfig), +sinkConnectorClass, +connectorClientConfigOverridePolicy, +kafkaClusterId, +ConnectorType.SINK); + +SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); +String groupId = (String) baseConsumerConfigs( +connName, "connector-consumer-", config, sinkConnectorConfig, +sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + +Map offsetsToAlter = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() != null) +.map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue( +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Set partitionsToReset = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() == null) +.map(Map.Entry::getKey) +.collect(Collectors.toSet()); + +KafkaFuture adminFuture = KafkaFuture.completedFuture(null); + +Admin admin = adminFactory.apply(adminConfig); + +try { +
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202709620 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } +/** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ +public void alterConnectorOffsets(String connName, Map connectorConfig, + Map, Map> offsets, Callback cb) { +String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); +Connector connector; + +try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { +connector = plugins.newConnector(connectorClassOrAlias); +if (ConnectUtils.isSinkConnector(connector)) { +log.debug("Altering consumer group offsets for sink connector: {}", connName); +alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} else { +log.debug("Altering offsets for source connector: {}", connName); +alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} +} +} + +/** + * Alter a sink connector's consumer group offsets. + * + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ +void alterSinkConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { +executor.submit(plugins.withClassLoader(connectorLoader, () -> { +try { +Map parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} + +Class sinkConnectorClass = connector.getClass(); +Map adminConfig = adminConfigs( +connName, +"connector-worker-adminclient-" + connName, +config, +new SinkConnectorConfig(plugins, connectorConfig), +sinkConnectorClass, +connectorClientConfigOverridePolicy, +kafkaClusterId, +ConnectorType.SINK); + +SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); +String groupId = (String) baseConsumerConfigs( +connName, "connector-consumer-", config, sinkConnectorConfig, +sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + +Map offsetsToAlter = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() != null) +.map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue( +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Set partitionsToReset = parsedOffsets.entrySet() +.stream() +.filter(entry -> entry.getValue() == null) +.map(Map.Entry::getKey) +.collect(Collectors.toSet()); + +KafkaFuture adminFuture = KafkaFuture.completedFuture(null); + +Admin admin = adminFactory.apply(adminConfig); + +try { +
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202694716 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1235,6 +1244,246 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } +/** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions (either source partitions for source connectors, or Kafka topic + *partitions for sink connectors) to offsets that need to be written; may not be null or empty + * @param cb callback to invoke upon completion + */ +public void alterConnectorOffsets(String connName, Map connectorConfig, + Map, Map> offsets, Callback cb) { + +if (offsets == null || offsets.isEmpty()) { +throw new ConnectException("The offsets to be altered may not be null or empty"); +} + +String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); +Connector connector; + +try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { +connector = plugins.newConnector(connectorClassOrAlias); +if (ConnectUtils.isSinkConnector(connector)) { +log.debug("Altering consumer group offsets for sink connector: {}", connName); +alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} else { +log.debug("Altering offsets for source connector: {}", connName); +alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} +} +} + +/** + * Alter a sink connector's consumer group offsets. + * + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be written; may not be null or empty + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ +void alterSinkConnectorOffsets(String connName, Connector connector, Map connectorConfig, + Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { +executor.submit(plugins.withClassLoader(connectorLoader, () -> { +try { +Map parsedOffsets = SinkUtils.parseSinkConnectorOffsets(offsets); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} + +SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); +Class sinkConnectorClass = connector.getClass(); +Map adminConfig = adminConfigs( +connName, +"connector-worker-adminclient-" + connName, +config, +sinkConnectorConfig, +sinkConnectorClass, +connectorClientConfigOverridePolicy, +kafkaClusterId, +ConnectorType.SINK); + +String groupId = (String) baseConsumerConfigs( +connName, "connector-consumer-", config, sinkConnectorConfig, +sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + +Admin admin = adminFactory.apply(adminConfig); + +try { +KafkaFuture adminFuture = KafkaFuture.completedFuture(null); Review Comment: Can we construct a `List>` here and add elements to that as necessary (i.e., depending on whether the sets of to-be-removed and to-be-altered partitions are empty), then combine then with `KafkaFuture adminFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));`? The use of a no-op future is a little hard to read. ##
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202684769 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java: ## @@ -52,4 +53,82 @@ public static ConnectorOffsets consumerGroupOffsetsToConnectorOffsets(Map + * { + * "kafka_topic": "topic" + * "kafka_partition": 3 + * } + * + * + * and that the provided offsets (values in the {@code partitionOffsets} map) look like: + * + * { + * "kafka_offset": 1000 + * } + * + * + * This method then parses them into a mapping from {@link TopicPartition}s to their corresponding {@link Long} + * valued offsets. + * + * @param partitionOffsets the partitions to offset map that needs to be validated and parsed. + * @return the parsed mapping from {@link TopicPartition} to its corresponding {@link Long} valued offset. + * + * @throws BadRequestException if the provided offsets aren't in the expected format + */ +public static Map validateAndParseSinkConnectorOffsets(Map, Map> partitionOffsets) { Review Comment: Well done, looks great -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202678391 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback cb) { ); } +@Override +public void alterConnectorOffsets(String connName, Map, Map> offsets, Callback callback) { +log.trace("Submitting alter offsets request for connector '{}'", connName); + +addRequest(() -> { +refreshConfigSnapshot(workerSyncTimeoutMs); +if (!alterConnectorOffsetsChecks(connName, callback)) { +return null; +} +// At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run +// a zombie fencing request +if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) { +log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName); +getFenceZombieSourceTasksCallable(connName, (error, ignored) -> { +if (error != null) { +log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error); +callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", +error), null); +} else { +log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName); +// We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest +// since it is being run in a separate herder request and the conditions could have changed since the +// previous check +addRequest(getAlterConnectorOffsetsCallable(connName, offsets, callback), forwardErrorCallback(callback)); +} +}).call(); +} else { +getAlterConnectorOffsetsCallable(connName, offsets, callback).call(); Review Comment: One convention I've seen used is `foo` (public) and `doFoo` (internal). I'm not a huge fan of it but in some cases it can be useful. Also, are we issuing unnecessary calls to `alterConnectorOffsetsChecks`? We do a check in the beginning of `alterConnectorOffsets` and then, if the connector is a sink (or exactly-once source support is disabled), we immediately invoke the `Callable` returned by `getAlterConnectorOffsetsCallable`, which calls `alterConnectorOffsetsChecks` a second time. We might just get rid of `getAlterConnectorOffsetsCallable` altogether and inline the logic for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1202644196 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback cb) { ); } +@Override +public void alterConnectorOffsets(String connName, Map, Map> offsets, Callback callback) { +log.trace("Submitting alter offsets request for connector '{}'", connName); + +addRequest(() -> { +refreshConfigSnapshot(workerSyncTimeoutMs); Review Comment: Good point--yes, we should handle that in `stopConnector` as well (although we don't have to do that in this PR). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1166062323 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -114,6 +127,7 @@ public class Worker { public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5); public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1); +public static final long ALTER_OFFSETS_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5); Review Comment: > users could get successful responses from the alter offsets API but the connector could completely ignore the overwritten offsets (if the user resumes the connector in the interim). That's a fair point. I was thinking that with the `STOPPED` state and a possible additional guard to prevent cancelled source tasks from committing offsets, we would have reasonable protection against zombie tasks from overwriting recently-altered offsets. However, I wasn't thinking of the other scenario, where the offset alter request is initiated and left ongoing while the connector is resumed. I think the risk of blocking the herder thread that you've brought up is perhaps the most convincing argument still in favor of making this operation asynchronous. There's risks not just with calls to the `alterOffsets` method, but also with reading to the end of offsets topics and contacting the transaction coordinator (if altering offsets for an exactly-once source connector), to name a few. If we really want to get fancy, one way we could try to decrease the risks of an asynchronous API for non-exactly-once source connectors could be to refuse to assign tasks for connectors with ongoing offset alterations during rebalance, even if the connector is resumed. The task configs for that connector could continue to live in the config topic, we'd just hold off on assigning them until the operation succeeds. Of course, this doesn't work if the leader of the cluster changes while an offset alter request is being serviced, but then (correct me if I'm wrong?) the same risks apply even with a synchronous API (although they're probably less likely). We could also try to add interruption logic that cancels any in-progress offset alter/reset requests when a rebalance starts. Either of these would be fine as a follow-up ticket, if they sound reasonable at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1166062323 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -114,6 +127,7 @@ public class Worker { public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5); public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1); +public static final long ALTER_OFFSETS_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5); Review Comment: > users could get successful responses from the alter offsets API but the connector could completely ignore the overwritten offsets (if the user resumes the connector in the interim). That's a fair point. I was thinking that with the `STOPPED` state and a possible additional guard to prevent cancelled source tasks from committing offsets, we would have reasonable protection against zombie tasks from overwriting recently-altered offsets. However, I wasn't thinking of the other scenario, where the offset alter request is initiated and left ongoing while the connector is resumed. I think the risk of blocking the herder thread that you've brought up is perhaps the most convincing argument still in favor of making this operation asynchronous. There's risks not just with calls to the `alterOffsets` method, but also with reading to the end of offsets topics and contacting the transaction coordinator (if altering offsets for an exactly-once source connector), to name a few. I think if we really want to get fancy, one way we could try to decrease the risks of an asynchronous API for non-exactly-once source connectors could be to refuse to assign tasks for connectors with ongoing offset alterations during rebalance, even if the connector is resumed. The task configs for that connector could continue to live in the config topic, we'd just hold off on assigning them until the operation succeeds. Of course, this doesn't work if the leader of the cluster changes while an offset alter request is being serviced, but then (correct me if I'm wrong?) the same risks apply even with a synchronous API (although they're probably less likely). We could also try to add interruption logic that cancels any in-progress offset alter/reset requests when a rebalance starts. Either of these would be fine as a follow-up ticket, if they sound reasonable at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1164465874 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -114,6 +127,7 @@ public class Worker { public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5); public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1); +public static final long ALTER_OFFSETS_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5); Review Comment: Hmm... I think this may be too short. With sink connectors it's fairly straightforward to alter consumer group offsets, but for source connectors we have to start and complete a read-to-end of the offsets topic, then write the new offsets to it. And in both cases, we have the `alterOffsets` connector method to worry about as well. Can we make the `Worker` API for altering offsets asynchronous, similar to what we do for reading offsets? I know that there's concern about tasks being brought up for the connector while the request is being handled, but I think this might be alright. If the connector is a sink connector, the requests to alter its consumer group's offsets will be rejected by the broker if any tasks are active. If the connector is a source connector and exactly-once support is enabled, zombie fencing will take place and we won't be able to complete our write to the offsets topic. Unless I'm mistaken, the only case that's left is non-exactly-once source connectors, which IMO it's acceptable for us to ignore since we can't guarantee that there aren't zombie tasks running around writing their own offsets anyways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org