C0urante commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1237173042
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java: ########## @@ -1819,16 +1825,25 @@ public void testGetSinkConnectorOffsetsAdminClientAsynchronousError() { verifyKafkaClusterId(); } - @SuppressWarnings("unchecked") private void mockAdminListConsumerGroupOffsets(Admin admin, Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets, Exception e) { + mockAdminListConsumerGroupOffsets(admin, consumerGroupOffsets, e, null, 0); + } + + private void mockAdminListConsumerGroupOffsets(Admin admin, Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets, Exception e, Time time, long delayMs) { ListConsumerGroupOffsetsResult result = mock(ListConsumerGroupOffsetsResult.class); when(admin.listConsumerGroupOffsets(anyString(), any(ListConsumerGroupOffsetsOptions.class))).thenReturn(result); - KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> adminFuture = mock(KafkaFuture.class); - when(result.partitionsToOffsetAndMetadata()).thenReturn(adminFuture); - when(adminFuture.whenComplete(any())).thenAnswer(invocation -> { - ((KafkaFuture.BiConsumer<Map<TopicPartition, OffsetAndMetadata>, Throwable>) invocation.getArgument(0)) - .accept(consumerGroupOffsets, e); - return null; + KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> adminFuture = new KafkaFutureImpl<>(); + if (e != null) { + adminFuture.completeExceptionally(e); + } else { + adminFuture.complete(consumerGroupOffsets); + } + when(result.partitionsToOffsetAndMetadata()).thenAnswer(invocation -> { + if (time == null) { + return adminFuture; + } + time.sleep(delayMs); + return adminFuture; Review Comment: Nit: can simplify this ```suggestion if (time != null) time.sleep(delayMs); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ########## @@ -574,6 +574,274 @@ public void alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect, "Source connector offsets should reflect the expected number of records produced"); } + @Test + public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws Exception { + // Create a source connector and stop it + connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs()); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + String url = connect.endpointForResource(String.format("connectors/%s/offsets", CONNECTOR_NAME)); + + String content = "[]"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot deserialize value")); + } + + content = "{}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(400, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Partitions / offsets need to be provided for an alter offsets request")); + } + + content = "{\"key\": []}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Unrecognized field")); + } + + content = "{\"offsets\": []}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(400, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Partitions / offsets need to be provided for an alter offsets request")); + } + + content = "{\"offsets\": {}}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot deserialize value")); + } + + content = "{\"offsets\": [123]}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot construct instance")); + } + + content = "{\"offsets\": [{\"key\": \"val\"}]}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Unrecognized field")); + } + + content = "{\"offsets\": [{\"partition\": []]}]}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot deserialize value")); + } + } + + @Test + public void testResetSinkConnectorOffsets() throws Exception { + resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); + } + + @Test + public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { + Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, + "overridden-group-id"); + resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka()); + // Ensure that the overridden consumer group ID was the one actually used + try (Admin admin = connect.kafka().createAdminClient()) { + Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get(); + assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId()))); + } + } + + @Test + public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); + + try (AutoCloseable ignored = kafkaCluster::stop) { + kafkaCluster.start(); + + Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + + resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster); + } + } + + private void resetAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { + int numPartitions = 3; + int numMessages = 10; + kafkaCluster.createTopic(TOPIC, numPartitions); + + // Produce numMessages messages to each partition + for (int partition = 0; partition < numPartitions; partition++) { + for (int message = 0; message < numMessages; message++) { + kafkaCluster.produce(TOPIC, partition, "key", "value"); + } + } + // Create sink connector + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); + + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, numMessages, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Reset the sink connector's offsets + String response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + waitForEmptySinkConnectorOffsets(CONNECTOR_NAME); + + // Reset the sink connector's offsets again while it is still in a STOPPED state and ensure that there is no error + response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + waitForEmptySinkConnectorOffsets(CONNECTOR_NAME); + + // Resume the connector and expect its offsets to catch up to the latest offsets + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector tasks did not resume in time" + ); + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, 10, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + } + + @Test + public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { + connect.kafka().createTopic(TOPIC, 1); + + // Produce 10 messages + for (int message = 0; message < 10; message++) { + connect.kafka().produce(TOPIC, 0, "key", "value"); + } + + // Configure a sink connector whose sink task blocks in its stop method + Map<String, String> connectorConfigs = new HashMap<>(); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName()); + connectorConfigs.put(TOPICS_CONFIG, TOPIC); + connectorConfigs.put("block", "Task::stop"); + + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector tasks did not start in time."); + + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, 10, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + + connect.stopConnector(CONNECTOR_NAME); + + // Try to reset the offsets + ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME)); + assertThat(e.getMessage(), containsString("zombie sink task")); + } + + @Test + public void testResetSourceConnectorOffsets() throws Exception { + resetAndVerifySourceConnectorOffsets(connect, baseSourceConnectorConfigs()); + } + + @Test + public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception { + Map<String, String> connectorConfigs = baseSourceConnectorConfigs(); + connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic"); + resetAndVerifySourceConnectorOffsets(connect, connectorConfigs); + } + + @Test + public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); + + try (AutoCloseable ignored = kafkaCluster::stop) { + kafkaCluster.start(); + + Map<String, String> connectorConfigs = baseSourceConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + + resetAndVerifySourceConnectorOffsets(connect, connectorConfigs); + } + } + + @Test + public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception { + Properties brokerProps = new Properties(); + brokerProps.put("transaction.state.log.replication.factor", "1"); + brokerProps.put("transaction.state.log.min.isr", "1"); + workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + + // This embedded Connect cluster will internally spin up its own embedded Kafka cluster + EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new EmbeddedConnectCluster.Builder() Review Comment: One more thought: in this case (and one other), we don't use the embedded Connect cluster created in the `@Before` method for this test suite. This is a little inefficient and can bloat test runtime. We can follow the same approach as the [ConnectWorkerIntegrationTest](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java), where we create an `EmbeddedConnectCluster.Builder` instance in the `@Before`-annotated method for the test suite, but only build and start the actual Connect cluster in each test case. It's a little more verbose but IMO it's worth the tradeoff since we gain more in decreased test time than we lose in extra lines of code. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions() + .timeoutMs((int) timer.remainingMs()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(timer.remainingMs(), TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + + timer.update(); + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting offsets for sink connector {}", connName, e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting offsets for sink connector " + connName, e), null); + return; + } + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } - Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); - - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + log.error("Failed to modify offsets for connector {} because it doesn't support external modification of offsets", + connName, e); + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); } + updateTimerAndCheckExpiry(timer, "Timed out while calling the 'alterOffsets' method for sink connector " + connName); Review Comment: Should we add a note to `SourceConnector::alterOffsets` and `SinkConnector::alterOffsets` that this method may be invoked multiple times with the same arguments if an external operation (writing to the offsets topic, deleting a consumer group, etc.) fails? That way, developers can know to silently ignore requests to reset offsets that don't appear to be present based on the connector's own external tracking logic. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java: ########## @@ -2240,6 +2255,193 @@ public void testAlterOffsetsSinkConnectorSynchronousError() throws Exception { verifyKafkaClusterId(); } + @Test + @SuppressWarnings("unchecked") + public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled() throws Exception { + Map<String, String> workerProps = new HashMap<>(this.workerProps); + workerProps.put("exactly.once.source.support", "enabled"); + workerProps.put("bootstrap.servers", "localhost:9092"); + workerProps.put("group.id", "connect-cluster"); + workerProps.put("config.storage.topic", "connect-configs"); + workerProps.put("offset.storage.topic", "connect-offsets"); + workerProps.put("status.storage.topic", "connect-statuses"); + config = new DistributedConfig(workerProps); + mockKafkaClusterId(); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, Executors.newSingleThreadExecutor(), + allConnectorClientConfigOverridePolicy, null); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenReturn(true); + ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); + OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class); + + Set<Map<String, Object>> connectorPartitions = new HashSet<>(); + connectorPartitions.add(Collections.singletonMap("partitionKey1", new Object())); + connectorPartitions.add(Collections.singletonMap("partitionKey2", new Object())); + when(offsetStore.connectorPartitions(eq(CONNECTOR_ID))).thenReturn(connectorPartitions); + when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { + invocation.getArgument(0, Callback.class).onCompletion(null, null); + return null; + }); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.modifySourceConnectorOffsets(CONNECTOR_ID, sourceConnector, connectorProps, null, offsetStore, producer, + offsetWriter, Thread.currentThread().getContextClassLoader(), cb); + assertEquals("The offsets for this connector have been reset successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); + + InOrder inOrder = Mockito.inOrder(offsetStore, offsetWriter, producer); + inOrder.verify(offsetStore).start(); + connectorPartitions.forEach(partition -> inOrder.verify(offsetWriter).offset(partition, null)); + inOrder.verify(offsetWriter).beginFlush(); + inOrder.verify(producer).initTransactions(); + inOrder.verify(producer).beginTransaction(); + inOrder.verify(offsetWriter).doFlush(any()); + inOrder.verify(producer).commitTransaction(); + inOrder.verify(offsetStore, timeout(1000)).stop(); + verifyKafkaClusterId(); + } + + @Test + public void testResetOffsetsSinkConnector() throws Exception { + mockKafkaClusterId(); + String connectorClass = SampleSinkConnector.class.getName(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); + + Admin admin = mock(Admin.class); + Time time = new MockTime(); + worker = new Worker(WORKER_ID, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool(), + allConnectorClientConfigOverridePolicy, config -> admin); + worker.start(); + + when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); + + TopicPartition tp = new TopicPartition("test-topic", 0); + mockAdminListConsumerGroupOffsets(admin, Collections.singletonMap(tp, new OffsetAndMetadata(10L)), null, time, 2000); + when(sinkConnector.alterOffsets(eq(connectorProps), eq(Collections.singletonMap(tp, null)))).thenAnswer(invocation -> { + time.sleep(3000); + return true; + }); + + DeleteConsumerGroupsResult deleteConsumerGroupsResult = mock(DeleteConsumerGroupsResult.class); + when(admin.deleteConsumerGroups(anyCollection(), any(DeleteConsumerGroupsOptions.class))).thenReturn(deleteConsumerGroupsResult); + when(deleteConsumerGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(null)); + + FutureCallback<Message> cb = new FutureCallback<>(); + worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, null, + Thread.currentThread().getContextClassLoader(), cb); + assertEquals("The offsets for this connector have been reset successfully", cb.get(1000, TimeUnit.MILLISECONDS).message()); + + ArgumentCaptor<DeleteConsumerGroupsOptions> deleteConsumerGroupsOptionsArgumentCaptor = ArgumentCaptor.forClass(DeleteConsumerGroupsOptions.class); + verify(admin).deleteConsumerGroups(anyCollection(), deleteConsumerGroupsOptionsArgumentCaptor.capture()); + // Expect the call to Admin::deleteConsumerGroups to have a timeout value equal to the overall timeout value of DEFAULT_REST_REQUEST_TIMEOUT_MS + // minus the delay introduced in the call to Admin::listConsumerGroupOffsets (2000 ms) and the delay introduced in the call to + // SinkConnector::alterOffsets (3000 ms) + assertEquals((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS - 2000L - 3000L, Review Comment: Very nice! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org