C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1204681794
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } + @Test + public void testAlterOffsetsConnectorNotFound() throws Exception { Review Comment: Nit: it's a little strange that we're using "not found" in the name here but elsewhere in this and the `StandaloneHerderTest` suite we use "unknown" ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsetsTest.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest.entities; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class ConnectorOffsetsTest { + + @Test + public void testConnectorOffsetsToMap() { + // Using arbitrary partition and offset formats here to demonstrate that source connector offsets don't + // follow a standard pattern + Map<String, Object> partition1 = new HashMap<>(); + partition1.put("partitionKey1", "partitionValue"); + partition1.put("k", 123); + Map<String, Object> offset1 = new HashMap<>(); + offset1.put("offset", 3.14); Review Comment: Yum! ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/SinkUtilsTest.java: ########## @@ -46,4 +52,124 @@ public void testConsumerGroupOffsetsToConnectorOffsets() { expectedPartition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); assertEquals(expectedPartition, connectorOffsets.offsets().get(0).partition()); } + + @Test + public void testValidateAndParseEmptyPartitionOffsetMap() { + // expect no exception to be thrown + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(new HashMap<>()); + assertTrue(parsedOffsets.isEmpty()); + } + + @Test + public void testValidateAndParseInvalidPartition() { + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic"); + Map<String, Object> offset = new HashMap<>(); + offset.put(SinkUtils.KAFKA_OFFSET_KEY, 100); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(partition, offset); + + // missing partition key + ConnectException e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); + assertThat(e.getMessage(), containsString("The partition for a sink connector offset must contain the keys 'kafka_topic' and 'kafka_partition'")); + + partition.put(SinkUtils.KAFKA_PARTITION_KEY, "not a number"); + // bad partition key + e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); + assertThat(e.getMessage(), containsString("Failed to parse the following Kafka partition value in the provided offsets: 'not a number'")); + + partition.remove(SinkUtils.KAFKA_TOPIC_KEY); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, "5"); + // missing topic key + e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); + assertThat(e.getMessage(), containsString("The partition for a sink connector offset must contain the keys 'kafka_topic' and 'kafka_partition'")); + } + + @Test + public void testValidateAndParseInvalidOffset() { + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic"); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, 10); + Map<String, Object> offset = new HashMap<>(); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(partition, offset); + + // missing offset key + ConnectException e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); + assertThat(e.getMessage(), containsString("The offset for a sink connector should either be null or contain the key 'kafka_offset'")); + + // bad offset key + offset.put(SinkUtils.KAFKA_OFFSET_KEY, "not a number"); + e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); + assertThat(e.getMessage(), containsString("Failed to parse the following Kafka offset value in the provided offsets: 'not a number'")); + } + + @Test + public void testValidateAndParseStringPartitionValue() { + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = createPartitionOffsetMap("topic", "10", "100"); + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(partitionOffsets); + assertEquals(1, parsedOffsets.size()); + TopicPartition tp = parsedOffsets.keySet().iterator().next(); + assertEquals(10, tp.partition()); + } + + @Test + public void testValidateAndParseIntegerPartitionValue() { + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = createPartitionOffsetMap("topic", 10, "100"); + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(partitionOffsets); + assertEquals(1, parsedOffsets.size()); + TopicPartition tp = parsedOffsets.keySet().iterator().next(); + assertEquals(10, tp.partition()); + } + + @Test + public void testValidateAndParseStringOffsetValue() { + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = createPartitionOffsetMap("topic", "10", "100"); + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(partitionOffsets); + assertEquals(1, parsedOffsets.size()); + Long offsetValue = parsedOffsets.values().iterator().next(); + assertEquals(100L, offsetValue.longValue()); + } + + @Test + public void testValidateAndParseIntegerOffsetValue() { + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = createPartitionOffsetMap("topic", "10", 100); + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(partitionOffsets); + assertEquals(1, parsedOffsets.size()); + Long offsetValue = parsedOffsets.values().iterator().next(); + assertEquals(100L, offsetValue.longValue()); + } + + @Test + public void testNullOffset() { + Map<String, Object> partitionMap = new HashMap<>(); + partitionMap.put(SinkUtils.KAFKA_TOPIC_KEY, "topic"); + partitionMap.put(SinkUtils.KAFKA_PARTITION_KEY, 10); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(partitionMap, null); + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(partitionOffsets); + assertEquals(1, parsedOffsets.size()); + assertNull(parsedOffsets.values().iterator().next()); + } + + @Test + public void testNullPartition() { Review Comment: These tests are fantastic for the most part. For the sake of completeness, should we include a case with a null topic name as well? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java: ########## @@ -1889,6 +1902,352 @@ public void testGetSourceConnectorOffsetsError() { verifyKafkaClusterId(); } + @Test + public void testAlterOffsetsConnectorDoesNotSupportOffsetAlteration() { + mockKafkaClusterId(); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + mockGenericIsolation(); + when(plugins.newConnector(anyString())).thenReturn(sourceConnector); + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't " + + "support altering of offsets")); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterConnectorOffsets(CONNECTOR_ID, connectorProps, + Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), + cb); + + ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); + assertEquals(ConnectException.class, e.getCause().getClass()); + assertEquals("Failed to alter offsets for connector " + CONNECTOR_ID + " because it doesn't support external modification of offsets", + e.getCause().getMessage()); + + verifyGenericIsolation(); + verifyKafkaClusterId(); + } + + @Test + @SuppressWarnings("unchecked") + public void testAlterOffsetsSourceConnector() throws Exception { + mockKafkaClusterId(); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true); + ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); + OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class); + + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue")); + + when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { + invocation.getArgument(0, Callback.class).onCompletion(null, null); + return null; + }); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, + offsetWriter, Thread.currentThread().getContextClassLoader(), cb); + assertEquals("The offsets for this connector have been altered successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); + + verify(offsetStore).configure(config); + verify(offsetStore).start(); + partitionOffsets.forEach((partition, offset) -> verify(offsetWriter).offset(partition, offset)); + verify(offsetWriter).beginFlush(); + verify(offsetWriter).doFlush(any()); + verify(offsetStore).stop(); + verifyKafkaClusterId(); + } + + @Test + @SuppressWarnings("unchecked") + public void testAlterOffsetsSourceConnectorError() throws Exception { + mockKafkaClusterId(); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true); + ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); + OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class); + + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue")); + + when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { + invocation.getArgument(0, Callback.class).onCompletion(new RuntimeException("Test exception"), null); + return null; + }); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, + offsetWriter, Thread.currentThread().getContextClassLoader(), cb); + ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS).message()); + assertEquals(ConnectException.class, e.getCause().getClass()); + + verify(offsetStore).configure(config); + verify(offsetStore).start(); + partitionOffsets.forEach((partition, offset) -> verify(offsetWriter).offset(partition, offset)); + verify(offsetWriter).beginFlush(); + verify(offsetWriter).doFlush(any()); + verify(offsetStore).stop(); + verifyKafkaClusterId(); + } + + @Test + public void testAlterOffsetsSinkConnectorNoResets() throws Exception { + @SuppressWarnings("unchecked") + ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + Map<String, String> partition1 = new HashMap<>(); + partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10"); + partitionOffsets.put(partition1, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 500)); + Map<String, String> partition2 = new HashMap<>(); + partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20"); + partitionOffsets.put(partition2, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 100)); + + // A null value for deleteOffsetsSetCapture indicates that we don't expect any call to Admin::deleteConsumerGroupOffsets + alterOffsetsSinkConnector(partitionOffsets, alterOffsetsMapCapture, null); + + assertEquals(2, alterOffsetsMapCapture.getValue().size()); + assertEquals(500, alterOffsetsMapCapture.getValue().get(new TopicPartition("test_topic", 10)).offset()); + assertEquals(100, alterOffsetsMapCapture.getValue().get(new TopicPartition("test_topic", 20)).offset()); + } + + @Test + public void testAlterOffsetSinkConnectorOnlyResets() throws Exception { + @SuppressWarnings("unchecked") + ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = ArgumentCaptor.forClass(Set.class); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + Map<String, String> partition1 = new HashMap<>(); + partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10"); + partitionOffsets.put(partition1, null); + Map<String, String> partition2 = new HashMap<>(); + partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20"); + partitionOffsets.put(partition2, null); + + // A null value for alterOffsetsMapCapture indicates that we don't expect any call to Admin::alterConsumerGroupOffsets + alterOffsetsSinkConnector(partitionOffsets, null, deleteOffsetsSetCapture); + + Set<TopicPartition> expectedTopicPartitionsForOffsetDelete = new HashSet<>(); + expectedTopicPartitionsForOffsetDelete.add(new TopicPartition("test_topic", 10)); + expectedTopicPartitionsForOffsetDelete.add(new TopicPartition("test_topic", 20)); + + assertEquals(2, deleteOffsetsSetCapture.getValue().size()); + + // Verify that contents are equal without caring about order + assertTrue(expectedTopicPartitionsForOffsetDelete.containsAll(deleteOffsetsSetCapture.getValue())); + assertTrue(deleteOffsetsSetCapture.getValue().containsAll(expectedTopicPartitionsForOffsetDelete)); Review Comment: Can't we simplify this by comparing the two `Set` instances for equality? ```suggestion // Verify that contents are equal without caring about order assertEquals(expectedTopicPartitionsForOffsetDelete, deleteOffsetsSetCapture.getValue()); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1515,6 +1520,79 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { ); } + @Override + public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) { + log.trace("Submitting alter offsets request for connector '{}'", connName); + + addRequest(() -> { + if (!alterConnectorOffsetsChecks(connName, callback)) { + return null; + } + // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run + // a zombie fencing request + if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) { + log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once support enabled.", connName); + doFenceZombieSourceTasks(connName, (error, ignored) -> { + if (error != null) { + log.error("Failed to perform zombie fencing for source connector prior to altering offsets", error); + callback.onCompletion(new ConnectException("Failed to perform zombie fencing for source connector prior to altering offsets", + error), null); + } else { + log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName); + // We need to ensure that we perform the necessary checks again before proceeding to actually altering the connector offsets since + // zombie fencing is done asynchronously and the conditions could have changed since the previous check + addRequest(() -> { + if (alterConnectorOffsetsChecks(connName, callback)) { + worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); + } + return null; + }, forwardErrorCallback(callback)); + } + }); + } else { + worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback); + } + return null; + }, forwardErrorCallback(callback)); + } + + /** + * This method performs a few checks for alter connector offsets request and completes the callback exceptionally + * if any check fails. + * @param connName the name of the connector whose offsets are to be altered + * @param callback callback to invoke upon completion + * @return true if all the checks passed, false otherwise + */ + private boolean alterConnectorOffsetsChecks(String connName, Callback<Message> callback) { + if (checkRebalanceNeeded(callback)) { + return false; + } + + if (!isLeader()) { + callback.onCompletion(new NotLeaderException("Only the leader can process alter offsets requests", leaderUrl()), null); + return false; + } + + if (!refreshConfigSnapshot(workerSyncTimeoutMs)) { + throw new ConnectException("Failed to read to end of config topic before altering connector offsets"); + } + + if (!configState.contains(connName)) { + callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); + return false; + } + + // If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above), + // we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop). + // Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled + // naturally because requests to alter consumer group offsets will fail if there are still active members in the group. + if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { + callback.onCompletion(new BadRequestException("The connector needs to be stopped before its offsets can be altered"), null); Review Comment: We should be really careful about the wording in this message (and the one in `StandaloneHerder`), since "stopping" a connector used to be synonymous with "deleting" one. A quick sketch of a more-informative error message: > "Connectors must be in the STOPPED state before their offsets can be altered. This can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java: ########## @@ -1889,6 +1902,352 @@ public void testGetSourceConnectorOffsetsError() { verifyKafkaClusterId(); } + @Test + public void testAlterOffsetsConnectorDoesNotSupportOffsetAlteration() { + mockKafkaClusterId(); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + mockGenericIsolation(); + when(plugins.newConnector(anyString())).thenReturn(sourceConnector); + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't " + + "support altering of offsets")); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterConnectorOffsets(CONNECTOR_ID, connectorProps, + Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), + cb); + + ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); + assertEquals(ConnectException.class, e.getCause().getClass()); + assertEquals("Failed to alter offsets for connector " + CONNECTOR_ID + " because it doesn't support external modification of offsets", + e.getCause().getMessage()); + + verifyGenericIsolation(); + verifyKafkaClusterId(); + } + + @Test + @SuppressWarnings("unchecked") + public void testAlterOffsetsSourceConnector() throws Exception { + mockKafkaClusterId(); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true); + ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); + OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class); + + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue")); + + when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { + invocation.getArgument(0, Callback.class).onCompletion(null, null); + return null; + }); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, + offsetWriter, Thread.currentThread().getContextClassLoader(), cb); + assertEquals("The offsets for this connector have been altered successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); + + verify(offsetStore).configure(config); + verify(offsetStore).start(); + partitionOffsets.forEach((partition, offset) -> verify(offsetWriter).offset(partition, offset)); + verify(offsetWriter).beginFlush(); + verify(offsetWriter).doFlush(any()); + verify(offsetStore).stop(); + verifyKafkaClusterId(); + } + + @Test + @SuppressWarnings("unchecked") + public void testAlterOffsetsSourceConnectorError() throws Exception { + mockKafkaClusterId(); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true); + ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); + OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class); + + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue")); + + when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { + invocation.getArgument(0, Callback.class).onCompletion(new RuntimeException("Test exception"), null); + return null; + }); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, + offsetWriter, Thread.currentThread().getContextClassLoader(), cb); + ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS).message()); + assertEquals(ConnectException.class, e.getCause().getClass()); + + verify(offsetStore).configure(config); + verify(offsetStore).start(); + partitionOffsets.forEach((partition, offset) -> verify(offsetWriter).offset(partition, offset)); + verify(offsetWriter).beginFlush(); + verify(offsetWriter).doFlush(any()); + verify(offsetStore).stop(); + verifyKafkaClusterId(); + } + + @Test + public void testAlterOffsetsSinkConnectorNoResets() throws Exception { + @SuppressWarnings("unchecked") + ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + Map<String, String> partition1 = new HashMap<>(); + partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10"); + partitionOffsets.put(partition1, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 500)); + Map<String, String> partition2 = new HashMap<>(); + partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20"); + partitionOffsets.put(partition2, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 100)); + + // A null value for deleteOffsetsSetCapture indicates that we don't expect any call to Admin::deleteConsumerGroupOffsets + alterOffsetsSinkConnector(partitionOffsets, alterOffsetsMapCapture, null); + + assertEquals(2, alterOffsetsMapCapture.getValue().size()); + assertEquals(500, alterOffsetsMapCapture.getValue().get(new TopicPartition("test_topic", 10)).offset()); + assertEquals(100, alterOffsetsMapCapture.getValue().get(new TopicPartition("test_topic", 20)).offset()); + } + + @Test + public void testAlterOffsetSinkConnectorOnlyResets() throws Exception { + @SuppressWarnings("unchecked") + ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = ArgumentCaptor.forClass(Set.class); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + Map<String, String> partition1 = new HashMap<>(); + partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10"); + partitionOffsets.put(partition1, null); + Map<String, String> partition2 = new HashMap<>(); + partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20"); + partitionOffsets.put(partition2, null); + + // A null value for alterOffsetsMapCapture indicates that we don't expect any call to Admin::alterConsumerGroupOffsets + alterOffsetsSinkConnector(partitionOffsets, null, deleteOffsetsSetCapture); + + Set<TopicPartition> expectedTopicPartitionsForOffsetDelete = new HashSet<>(); + expectedTopicPartitionsForOffsetDelete.add(new TopicPartition("test_topic", 10)); + expectedTopicPartitionsForOffsetDelete.add(new TopicPartition("test_topic", 20)); + + assertEquals(2, deleteOffsetsSetCapture.getValue().size()); + + // Verify that contents are equal without caring about order + assertTrue(expectedTopicPartitionsForOffsetDelete.containsAll(deleteOffsetsSetCapture.getValue())); + assertTrue(deleteOffsetsSetCapture.getValue().containsAll(expectedTopicPartitionsForOffsetDelete)); + } + + @Test + public void testAlterOffsetsSinkConnectorAltersAndResets() throws Exception { + @SuppressWarnings("unchecked") + ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); + @SuppressWarnings("unchecked") + ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = ArgumentCaptor.forClass(Set.class); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + Map<String, String> partition1 = new HashMap<>(); + partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10"); + partitionOffsets.put(partition1, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100")); + Map<String, String> partition2 = new HashMap<>(); + partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20"); + partitionOffsets.put(partition2, null); + + alterOffsetsSinkConnector(partitionOffsets, alterOffsetsMapCapture, deleteOffsetsSetCapture); + + assertEquals(1, alterOffsetsMapCapture.getValue().size()); + assertEquals(100, alterOffsetsMapCapture.getValue().get(new TopicPartition("test_topic", 10)).offset()); + + assertEquals(1, deleteOffsetsSetCapture.getValue().size()); + assertEquals(new TopicPartition("test_topic", 20), deleteOffsetsSetCapture.getValue().iterator().next()); Review Comment: Same thought RE comparing `Set` instances for equality here ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java: ########## @@ -1889,6 +1902,352 @@ public void testGetSourceConnectorOffsetsError() { verifyKafkaClusterId(); } + @Test + public void testAlterOffsetsConnectorDoesNotSupportOffsetAlteration() { + mockKafkaClusterId(); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + mockGenericIsolation(); + when(plugins.newConnector(anyString())).thenReturn(sourceConnector); + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't " + + "support altering of offsets")); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterConnectorOffsets(CONNECTOR_ID, connectorProps, + Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")), + cb); + + ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); + assertEquals(ConnectException.class, e.getCause().getClass()); + assertEquals("Failed to alter offsets for connector " + CONNECTOR_ID + " because it doesn't support external modification of offsets", + e.getCause().getMessage()); + + verifyGenericIsolation(); + verifyKafkaClusterId(); + } + + @Test + @SuppressWarnings("unchecked") + public void testAlterOffsetsSourceConnector() throws Exception { + mockKafkaClusterId(); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true); + ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); + OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class); + + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue")); + + when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { + invocation.getArgument(0, Callback.class).onCompletion(null, null); + return null; + }); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, + offsetWriter, Thread.currentThread().getContextClassLoader(), cb); + assertEquals("The offsets for this connector have been altered successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); + + verify(offsetStore).configure(config); + verify(offsetStore).start(); + partitionOffsets.forEach((partition, offset) -> verify(offsetWriter).offset(partition, offset)); + verify(offsetWriter).beginFlush(); + verify(offsetWriter).doFlush(any()); + verify(offsetStore).stop(); + verifyKafkaClusterId(); + } + + @Test + @SuppressWarnings("unchecked") + public void testAlterOffsetsSourceConnectorError() throws Exception { + mockKafkaClusterId(); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true); + ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); + OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class); + + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")); + partitionOffsets.put(Collections.singletonMap("partitionKey", "partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue")); + + when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { + invocation.getArgument(0, Callback.class).onCompletion(new RuntimeException("Test exception"), null); + return null; + }); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, partitionOffsets, offsetStore, producer, + offsetWriter, Thread.currentThread().getContextClassLoader(), cb); + ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS).message()); + assertEquals(ConnectException.class, e.getCause().getClass()); + + verify(offsetStore).configure(config); + verify(offsetStore).start(); + partitionOffsets.forEach((partition, offset) -> verify(offsetWriter).offset(partition, offset)); + verify(offsetWriter).beginFlush(); + verify(offsetWriter).doFlush(any()); + verify(offsetStore).stop(); + verifyKafkaClusterId(); + } + + @Test + public void testAlterOffsetsSinkConnectorNoResets() throws Exception { + @SuppressWarnings("unchecked") + ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + Map<String, String> partition1 = new HashMap<>(); + partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10"); + partitionOffsets.put(partition1, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 500)); + Map<String, String> partition2 = new HashMap<>(); + partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20"); + partitionOffsets.put(partition2, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 100)); + + // A null value for deleteOffsetsSetCapture indicates that we don't expect any call to Admin::deleteConsumerGroupOffsets + alterOffsetsSinkConnector(partitionOffsets, alterOffsetsMapCapture, null); + + assertEquals(2, alterOffsetsMapCapture.getValue().size()); + assertEquals(500, alterOffsetsMapCapture.getValue().get(new TopicPartition("test_topic", 10)).offset()); + assertEquals(100, alterOffsetsMapCapture.getValue().get(new TopicPartition("test_topic", 20)).offset()); + } + + @Test + public void testAlterOffsetSinkConnectorOnlyResets() throws Exception { + @SuppressWarnings("unchecked") + ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = ArgumentCaptor.forClass(Set.class); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + Map<String, String> partition1 = new HashMap<>(); + partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10"); + partitionOffsets.put(partition1, null); + Map<String, String> partition2 = new HashMap<>(); + partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20"); + partitionOffsets.put(partition2, null); + + // A null value for alterOffsetsMapCapture indicates that we don't expect any call to Admin::alterConsumerGroupOffsets + alterOffsetsSinkConnector(partitionOffsets, null, deleteOffsetsSetCapture); + + Set<TopicPartition> expectedTopicPartitionsForOffsetDelete = new HashSet<>(); + expectedTopicPartitionsForOffsetDelete.add(new TopicPartition("test_topic", 10)); + expectedTopicPartitionsForOffsetDelete.add(new TopicPartition("test_topic", 20)); + + assertEquals(2, deleteOffsetsSetCapture.getValue().size()); + + // Verify that contents are equal without caring about order + assertTrue(expectedTopicPartitionsForOffsetDelete.containsAll(deleteOffsetsSetCapture.getValue())); + assertTrue(deleteOffsetsSetCapture.getValue().containsAll(expectedTopicPartitionsForOffsetDelete)); + } + + @Test + public void testAlterOffsetsSinkConnectorAltersAndResets() throws Exception { + @SuppressWarnings("unchecked") + ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); + @SuppressWarnings("unchecked") + ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = ArgumentCaptor.forClass(Set.class); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + Map<String, String> partition1 = new HashMap<>(); + partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10"); + partitionOffsets.put(partition1, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100")); + Map<String, String> partition2 = new HashMap<>(); + partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20"); + partitionOffsets.put(partition2, null); + + alterOffsetsSinkConnector(partitionOffsets, alterOffsetsMapCapture, deleteOffsetsSetCapture); + + assertEquals(1, alterOffsetsMapCapture.getValue().size()); + assertEquals(100, alterOffsetsMapCapture.getValue().get(new TopicPartition("test_topic", 10)).offset()); + + assertEquals(1, deleteOffsetsSetCapture.getValue().size()); + assertEquals(new TopicPartition("test_topic", 20), deleteOffsetsSetCapture.getValue().iterator().next()); + } + + private void alterOffsetsSinkConnector(Map<Map<String, ?>, Map<String, ?>> partitionOffsets, + ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> alterOffsetsMapCapture, + ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture) throws Exception { + mockKafkaClusterId(); + String connectorClass = SampleSinkConnector.class.getName(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); + + Admin admin = mock(Admin.class); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newCachedThreadPool(), + allConnectorClientConfigOverridePolicy, config -> admin); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sinkConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true); + + // If alterOffsetsMapCapture is null, then we won't stub any of the following methods resulting in test failures in case + // offsets for certain topic partitions are actually attempted to be altered. + if (alterOffsetsMapCapture != null) { + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = mock(AlterConsumerGroupOffsetsResult.class); + when(admin.alterConsumerGroupOffsets(anyString(), alterOffsetsMapCapture.capture(), any(AlterConsumerGroupOffsetsOptions.class))) + .thenReturn(alterConsumerGroupOffsetsResult); + KafkaFuture<Void> alterFuture = KafkaFuture.completedFuture(null); + when(alterConsumerGroupOffsetsResult.all()).thenReturn(alterFuture); + } + + // If deleteOffsetsSetCapture is null, then we won't stub any of the following methods resulting in test failures in case + // offsets for certain topic partitions are actually attempted to be deleted. + if (deleteOffsetsSetCapture != null) { + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = mock(DeleteConsumerGroupOffsetsResult.class); + when(admin.deleteConsumerGroupOffsets(anyString(), deleteOffsetsSetCapture.capture(), any(DeleteConsumerGroupOffsetsOptions.class))) + .thenReturn(deleteConsumerGroupOffsetsResult); + KafkaFuture<Void> deleteFuture = KafkaFuture.completedFuture(null); + when(deleteConsumerGroupOffsetsResult.all()).thenReturn(deleteFuture); + } + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, + Thread.currentThread().getContextClassLoader(), cb); + assertEquals("The offsets for this connector have been altered successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); + + verify(admin, timeout(1000)).close(); + verifyKafkaClusterId(); + } + + @Test + public void testAlterOffsetsSinkConnectorAlterOffsetsError() throws Exception { + mockKafkaClusterId(); + String connectorClass = SampleSinkConnector.class.getName(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); + + Admin admin = mock(Admin.class); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, config -> admin); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sinkConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true); + + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = mock(AlterConsumerGroupOffsetsResult.class); + when(admin.alterConsumerGroupOffsets(anyString(), anyMap(), any(AlterConsumerGroupOffsetsOptions.class))) + .thenReturn(alterConsumerGroupOffsetsResult); + KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>(); + alterFuture.completeExceptionally(new ClusterAuthorizationException("Test exception")); + when(alterConsumerGroupOffsetsResult.all()).thenReturn(alterFuture); + + Map<String, String> partition1 = new HashMap<>(); + partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic"); + partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10"); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = Collections.singletonMap(partition1, + Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100")); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, partitionOffsets, + Thread.currentThread().getContextClassLoader(), cb); + + ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS)); + assertEquals(ConnectException.class, e.getCause().getClass()); + + verify(admin, timeout(1000)).close(); + verifyNoMoreInteractions(admin); + verifyKafkaClusterId(); + } + + @Test + @SuppressWarnings("unchecked") Review Comment: Nit: unnecessary suppression ```suggestion ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } + @Test + public void testAlterOffsetsConnectorNotFound() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof NotFoundException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof BadRequestException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsNotLeader() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); Review Comment: So this part is missing a succeeding call to `PowerMock::expectLastCall`, but the test case passes anyways (and if we comment out this line, it fails). I did some experimenting and we can comment out every call to `PowerMock::expectLastCall` except for the last one on line 4216 and the test will still pass. Probably best to just add a call after this line for consistency's sake, but interesting behavior from PowerMock. If we weren't migrating to Mockito it might be worth looking into to see if that behavior is intentional and, if so, if we could leverage it to reduce noise in our test cases. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } + @Test + public void testAlterOffsetsConnectorNotFound() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof NotFoundException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof BadRequestException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsNotLeader() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof NotLeaderException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); + worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); + Message msg = new Message("The offsets for this connector have been altered successfully"); + EasyMock.expectLastCall().andAnswer(() -> { + workerCallbackCapture.getValue().onCompletion(null, msg); + return null; + }); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, offsets, callback); + herder.tick(); + assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals("The offsets for this connector have been altered successfully", msg.message()); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsSourceConnectorExactlyOnceEnabled() throws Exception { + // Setup herder with exactly-once support for source connectors enabled + herder = exactlyOnceHerder(); + rebalanceListener = herder.new RebalanceListener(time); + PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes(); + PowerMock.expectPrivate(herder, "updateDeletedTaskStatus").andVoid().anyTimes(); + + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall().anyTimes(); + + // Now handle the alter connector offsets request + Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); + member.wakeup(); + PowerMock.expectLastCall().anyTimes(); + member.ensureActive(); + PowerMock.expectLastCall().anyTimes(); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SOURCE).anyTimes(); + + // Expect a round of zombie fencing to occur + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + KafkaFuture<Void> workerFencingFuture = EasyMock.mock(KafkaFuture.class); + KafkaFuture<Void> herderFencingFuture = EasyMock.mock(KafkaFuture.class); + EasyMock.expect(worker.fenceZombies(CONN1, SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1), CONN1_CONFIG)).andReturn(workerFencingFuture); + EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void, Void>>anyObject())).andReturn(herderFencingFuture); + + // Two fencing callbacks are added - one is in ZombieFencing::start itself to remove the connector from the active + // fencing list. The other is the callback passed from DistributedHerder::alterConnectorOffsets in order to + // queue up the actual alter offsets request if the zombie fencing succeeds. + for (int i = 0; i < 2; i++) { + Capture<KafkaFuture.BiConsumer<Void, Throwable>> herderFencingCallback = EasyMock.newCapture(); + EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(() -> { + herderFencingCallback.getValue().accept(null, null); + return null; + }); + } + + Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); + Message msg = new Message("The offsets for this connector have been altered successfully"); + worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); + EasyMock.expectLastCall().andAnswer(() -> { + workerCallbackCapture.getValue().onCompletion(null, msg); + return null; + }); + + // Handle the second alter connector offsets request. No zombie fencing request to the worker is expected now since we + // already did a round of zombie fencing last time and no new tasks came up in the meanwhile. The config snapshot is + // refreshed once at the beginning of the DistributedHerder::alterConnectorOffsets method, once before checking + // whether zombie fencing is required, and once before actually proceeding to alter connector offsets. + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); + Capture<Callback<Message>> workerCallbackCapture2 = Capture.newInstance(); + worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture2)); + EasyMock.expectLastCall().andAnswer(() -> { + workerCallbackCapture2.getValue().onCompletion(null, msg); + return null; + }); + + PowerMock.replayAll(workerFencingFuture, herderFencingFuture); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, offsets, callback); + // Process the zombie fencing request + herder.tick(); + // Process the alter offsets request + herder.tick(); + assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); + + FutureCallback<Message> callback2 = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, offsets, callback2); + herder.tick(); + assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() throws Exception { Review Comment: This goes above and beyond what I was expecting for coverage here, great stuff 👍 Do you think it's also worth adding a case for a sink connector, or is that too similar to the non-exactly-once source case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org