yashmayya commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1205023533
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } + @Test + public void testAlterOffsetsConnectorNotFound() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof NotFoundException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof BadRequestException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsNotLeader() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); Review Comment: Yeah, it does seem like it is intentional - apparently `expectLastCall` (both `EasyMock` and `PowerMock`) is optional for void methods and is mainly used for clarity. FWIW, I was able to get the test to pass even after removing the last one (after the call to `member.poll()`). ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/SinkUtilsTest.java: ########## @@ -46,4 +52,124 @@ public void testConsumerGroupOffsetsToConnectorOffsets() { expectedPartition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); assertEquals(expectedPartition, connectorOffsets.offsets().get(0).partition()); } + + @Test + public void testValidateAndParseEmptyPartitionOffsetMap() { + // expect no exception to be thrown + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(new HashMap<>()); + assertTrue(parsedOffsets.isEmpty()); + } + + @Test + public void testValidateAndParseInvalidPartition() { + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic"); + Map<String, Object> offset = new HashMap<>(); + offset.put(SinkUtils.KAFKA_OFFSET_KEY, 100); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(partition, offset); + + // missing partition key + ConnectException e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); + assertThat(e.getMessage(), containsString("The partition for a sink connector offset must contain the keys 'kafka_topic' and 'kafka_partition'")); + + partition.put(SinkUtils.KAFKA_PARTITION_KEY, "not a number"); + // bad partition key + e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); + assertThat(e.getMessage(), containsString("Failed to parse the following Kafka partition value in the provided offsets: 'not a number'")); + + partition.remove(SinkUtils.KAFKA_TOPIC_KEY); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, "5"); + // missing topic key + e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); + assertThat(e.getMessage(), containsString("The partition for a sink connector offset must contain the keys 'kafka_topic' and 'kafka_partition'")); + } + + @Test + public void testValidateAndParseInvalidOffset() { + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic"); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, 10); + Map<String, Object> offset = new HashMap<>(); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(partition, offset); + + // missing offset key + ConnectException e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); + assertThat(e.getMessage(), containsString("The offset for a sink connector should either be null or contain the key 'kafka_offset'")); + + // bad offset key + offset.put(SinkUtils.KAFKA_OFFSET_KEY, "not a number"); + e = assertThrows(ConnectException.class, () -> SinkUtils.parseSinkConnectorOffsets(partitionOffsets)); + assertThat(e.getMessage(), containsString("Failed to parse the following Kafka offset value in the provided offsets: 'not a number'")); + } + + @Test + public void testValidateAndParseStringPartitionValue() { + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = createPartitionOffsetMap("topic", "10", "100"); + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(partitionOffsets); + assertEquals(1, parsedOffsets.size()); + TopicPartition tp = parsedOffsets.keySet().iterator().next(); + assertEquals(10, tp.partition()); + } + + @Test + public void testValidateAndParseIntegerPartitionValue() { + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = createPartitionOffsetMap("topic", 10, "100"); + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(partitionOffsets); + assertEquals(1, parsedOffsets.size()); + TopicPartition tp = parsedOffsets.keySet().iterator().next(); + assertEquals(10, tp.partition()); + } + + @Test + public void testValidateAndParseStringOffsetValue() { + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = createPartitionOffsetMap("topic", "10", "100"); + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(partitionOffsets); + assertEquals(1, parsedOffsets.size()); + Long offsetValue = parsedOffsets.values().iterator().next(); + assertEquals(100L, offsetValue.longValue()); + } + + @Test + public void testValidateAndParseIntegerOffsetValue() { + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = createPartitionOffsetMap("topic", "10", 100); + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(partitionOffsets); + assertEquals(1, parsedOffsets.size()); + Long offsetValue = parsedOffsets.values().iterator().next(); + assertEquals(100L, offsetValue.longValue()); + } + + @Test + public void testNullOffset() { + Map<String, Object> partitionMap = new HashMap<>(); + partitionMap.put(SinkUtils.KAFKA_TOPIC_KEY, "topic"); + partitionMap.put(SinkUtils.KAFKA_PARTITION_KEY, 10); + Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>(); + partitionOffsets.put(partitionMap, null); + Map<TopicPartition, Long> parsedOffsets = SinkUtils.parseSinkConnectorOffsets(partitionOffsets); + assertEquals(1, parsedOffsets.size()); + assertNull(parsedOffsets.values().iterator().next()); + } + + @Test + public void testNullPartition() { Review Comment: Ah, I hadn't noticed that we're treating null topic names as the Kafka topic "null". While that is a valid Kafka topic name, it should be provided as the string value "null" and it makes more sense to invalidate a null topic name. I've updated the `parseSinkConnectorOffsets` method to include this check and added a new test for this case, thanks! ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } + @Test + public void testAlterOffsetsConnectorNotFound() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof NotFoundException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof BadRequestException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsNotLeader() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); + herder.tick(); + ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); + assertTrue(e.getCause() instanceof NotLeaderException); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws Exception { + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // Now handle the alter connector offsets request + Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); + worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); + Message msg = new Message("The offsets for this connector have been altered successfully"); + EasyMock.expectLastCall().andAnswer(() -> { + workerCallbackCapture.getValue().onCompletion(null, msg); + return null; + }); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, offsets, callback); + herder.tick(); + assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals("The offsets for this connector have been altered successfully", msg.message()); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsSourceConnectorExactlyOnceEnabled() throws Exception { + // Setup herder with exactly-once support for source connectors enabled + herder = exactlyOnceHerder(); + rebalanceListener = herder.new RebalanceListener(time); + PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes(); + PowerMock.expectPrivate(herder, "updateDeletedTaskStatus").andVoid().anyTimes(); + + // Get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall().anyTimes(); + + // Now handle the alter connector offsets request + Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>(); + member.wakeup(); + PowerMock.expectLastCall().anyTimes(); + member.ensureActive(); + PowerMock.expectLastCall().anyTimes(); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SOURCE).anyTimes(); + + // Expect a round of zombie fencing to occur + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); + KafkaFuture<Void> workerFencingFuture = EasyMock.mock(KafkaFuture.class); + KafkaFuture<Void> herderFencingFuture = EasyMock.mock(KafkaFuture.class); + EasyMock.expect(worker.fenceZombies(CONN1, SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1), CONN1_CONFIG)).andReturn(workerFencingFuture); + EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void, Void>>anyObject())).andReturn(herderFencingFuture); + + // Two fencing callbacks are added - one is in ZombieFencing::start itself to remove the connector from the active + // fencing list. The other is the callback passed from DistributedHerder::alterConnectorOffsets in order to + // queue up the actual alter offsets request if the zombie fencing succeeds. + for (int i = 0; i < 2; i++) { + Capture<KafkaFuture.BiConsumer<Void, Throwable>> herderFencingCallback = EasyMock.newCapture(); + EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(() -> { + herderFencingCallback.getValue().accept(null, null); + return null; + }); + } + + Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); + Message msg = new Message("The offsets for this connector have been altered successfully"); + worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); + EasyMock.expectLastCall().andAnswer(() -> { + workerCallbackCapture.getValue().onCompletion(null, msg); + return null; + }); + + // Handle the second alter connector offsets request. No zombie fencing request to the worker is expected now since we + // already did a round of zombie fencing last time and no new tasks came up in the meanwhile. The config snapshot is + // refreshed once at the beginning of the DistributedHerder::alterConnectorOffsets method, once before checking + // whether zombie fencing is required, and once before actually proceeding to alter connector offsets. + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); + expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); + Capture<Callback<Message>> workerCallbackCapture2 = Capture.newInstance(); + worker.alterConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture2)); + EasyMock.expectLastCall().andAnswer(() -> { + workerCallbackCapture2.getValue().onCompletion(null, msg); + return null; + }); + + PowerMock.replayAll(workerFencingFuture, herderFencingFuture); + + herder.tick(); + FutureCallback<Message> callback = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, offsets, callback); + // Process the zombie fencing request + herder.tick(); + // Process the alter offsets request + herder.tick(); + assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); + + FutureCallback<Message> callback2 = new FutureCallback<>(); + herder.alterConnectorOffsets(CONN1, offsets, callback2); + herder.tick(); + assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); + + PowerMock.verifyAll(); + } + + @Test + public void testAlterOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() throws Exception { Review Comment: Thanks! Yeah, I hadn't added one for sink connectors because in the current shape it's identical to the non-EoS source case. However, additional test cases don't hurt and if the behavior ever diverges in the future for sink connectors versus non-EoS source connectors, it'll be easier if there's already a test case in place to update. The newly added test looks slightly messy (due to the need to reset previous expectations and add new ones in the EasyMock / PowerMock paradigm) but will look cleaner when I migrate it to Mockito along with the other tests here. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } + /** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ + public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Altering consumer group offsets for sink connector: {}", connName); + alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } else { + log.debug("Altering offsets for source connector: {}", connName); + alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } + } + } + + /** + * Alter a sink connector's consumer group offsets. + * <p> + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ + void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { + executor.submit(plugins.withClassLoader(connectorLoader, () -> { + try { + Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + + Class<? extends Connector> sinkConnectorClass = connector.getClass(); + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + sinkConnectorClass, + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SINK); + + SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, sinkConnectorConfig, + sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + + Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null); + + Admin admin = adminFactory.apply(adminConfig); + + try { + if (!offsetsToAlter.isEmpty()) { + log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.", + connName, offsetsToAlter); + AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, + alterConsumerGroupOffsetsOptions); + + adminFuture = alterConsumerGroupOffsetsResult.all(); + } + + adminFuture.whenComplete((ignored, error) -> { + if (error != null) { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + // When a consumer group is non-empty, only group members can commit offsets. The above attempt to alter offsets via the admin + // client will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped + // completely or if the connector is resumed while the alter offsets request is being processed) + if (error instanceof UnknownMemberIdException) { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), + null); + } else { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for " + + "connector " + connName, error), + null); + } + } else if (!partitionsToReset.isEmpty()) { + log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", + connName, partitionsToReset); + DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, + deleteConsumerGroupOffsetsOptions); + deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + if (error2 != null) { + // The attempt to delete offsets for certain topic partitions via the admin client will result in a + // GroupSubscribedToTopicException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped completely + // or if the connector is resumed while the alter offsets request is being processed). + if (error2 instanceof GroupSubscribedToTopicException) { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), + null); + } else { + cb.onCompletion(new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector " + + connName, error2), + null); + } + } else { + completeAlterOffsetsCallback(alterOffsetsResult, cb); + } + }); + } else { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + completeAlterOffsetsCallback(alterOffsetsResult, cb); + } + }); + } catch (Throwable t) { + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + throw t; + } + } catch (Throwable t) { + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null); + } + })); + } + + /** + * Alter a source connector's offsets. + * + * @param connName the name of the source connector whose offsets are to be altered + * @param connector an instance of the source connector + * @param connectorConfig the source connector's configuration + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ + private void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable()); + Map<String, Object> producerProps = config.exactlyOnceSourceEnabled() + ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig, + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId) + : baseProducerConfigs(connName, "connector-offset-producer-" + connName, config, sourceConfig, + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = config.exactlyOnceSourceEnabled() + ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, producer) + : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, producer); + + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, connName, internalKeyConverter, internalValueConverter); + alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, offsetStore, producer, offsetWriter, connectorLoader, cb); + } + + // Visible for testing + void alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ConnectorOffsetBackingStore offsetStore, + KafkaProducer<byte[], byte[]> producer, OffsetStorageWriter offsetWriter, + ClassLoader connectorLoader, Callback<Message> cb) { + executor.submit(plugins.withClassLoader(connectorLoader, () -> { + try { + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SourceConnector) connector).alterOffsets(connectorConfig, offsets); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + + offsetStore.configure(config); + // This reads to the end of the offsets topic and can be a potentially time-consuming operation + offsetStore.start(); Review Comment: Ah okay, I get it now, thanks. I've moved the `configure` call to the top level `alterSourceConnectorOffsets` method and moved the finally block where cleanup occurs to the top level try catch in the overloaded method (this is because the `KafkaBasedLog` is instantiated in the `configure` method and we need it to ensure resource cleanup in the call to `KafkaOffsetBackingStore::stop`). This also assumes that the call to `KafkaOffsetBackingStore::configure` won't throw an exception because at this point the connector tasks should already have successfully configured and started their own offset stores. On a side note, it doesn't look like we have exception handling in place for calls to `KafkaOffsetBackingStore::configure` elsewhere either and could lead to leaking clients created for the store. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org