jtuglu1 commented on code in PR #18466: URL: https://github.com/apache/druid/pull/18466#discussion_r2377778140
########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/TaskConfigUpdateRequest.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Request object for updating the configuration of a running {@link SeekableStreamIndexTask}. + */ +public class TaskConfigUpdateRequest<PartitionIdType, SequenceOffsetType> +{ + private final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig; + + private final String supervisorSpecVersion; + + @JsonCreator + public TaskConfigUpdateRequest( + @JsonProperty("ioConfig") @Nullable SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig, + @JsonProperty("supervisorSpecVersion") String supervisorSpecVersion Review Comment: nit: can we assert this is non-null like we do for other task actions' required params? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -2762,6 +3097,23 @@ public boolean isTaskCurrent(int taskGroupId, String taskId, Map<String, Task> a } } + /** + * Verifies whether the running config version of the persistent task matches one in current supervisor spec. + */ + private boolean isPersistentTaskCurrent(SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task) + { + try { + final String currentVersion = spec.getVersion().get(); + final TaskConfigResponse<PartitionIdType, SequenceOffsetType> runningConfig = FutureUtils.get( + taskClient.getTaskConfigAsync(task.getId()), true); + return currentVersion.equals(runningConfig.getSupervisorSpecVersion()); Review Comment: The task autoscaler commits an entry in supervisor table that just differs by `ioConfig.taskCount`. I see we are populating the `created_date` with this version in the spec, but the autoscaler would be also submitting its own latest spec with a different `created_date`. I assume this won't cause issues? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/TaskConfigResponse.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Response object for returning the configuration of a running {@link SeekableStreamIndexTask}. + */ +public class TaskConfigResponse<PartitionIdType, SequenceOffsetType> +{ + private final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig; + private final String supervisorSpecVersion; + + @JsonCreator + public TaskConfigResponse( + @JsonProperty("ioConfig") @Nullable SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig, + @JsonProperty("supervisorSpecVersion") String supervisorSpecVersion + ) + { + this.ioConfig = ioConfig; + this.supervisorSpecVersion = supervisorSpecVersion; Review Comment: nit: same null check ########## indexing-service/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java: ########## @@ -192,6 +212,28 @@ private Void insertEntryWithHandle( } } + private Void updateEntryWithHandle( Review Comment: nit: perhaps add some javadoc heading like `insertEntryWithHandle`. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -3529,13 +3885,24 @@ public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> apply(List<Eit try { for (int i = 0; i < results.size(); i++) { if (results.get(i).isValue() && Boolean.valueOf(true).equals(results.get(i).valueOrThrow())) { - log.info("Successfully set endOffsets for task[%s] and resumed it", setEndOffsetTaskIds.get(i)); + log.info("Successfully set endOffsets for task[%s]", setEndOffsetTaskIds.get(i)); } else { String taskId = setEndOffsetTaskIds.get(i); killTask(taskId, "Failed to set end offsets, killing task"); taskGroup.tasks.remove(taskId); } } + if (isDynamicAllocationOngoing.get()) { + checkpointsToWaitFor -= setEndOffsetFutures.size(); + if (checkpointsToWaitFor <= 0) { + log.info("All tasks in current task groups have been checkpointed, resuming dynamic allocation"); + pendingConfigUpdateHook.call(); Review Comment: Is `pendingConfigUpdateHook` written/read across threads? Might want to make this volatile or an atomic ref ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -3529,13 +3885,24 @@ public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> apply(List<Eit try { for (int i = 0; i < results.size(); i++) { if (results.get(i).isValue() && Boolean.valueOf(true).equals(results.get(i).valueOrThrow())) { - log.info("Successfully set endOffsets for task[%s] and resumed it", setEndOffsetTaskIds.get(i)); + log.info("Successfully set endOffsets for task[%s]", setEndOffsetTaskIds.get(i)); } else { String taskId = setEndOffsetTaskIds.get(i); killTask(taskId, "Failed to set end offsets, killing task"); taskGroup.tasks.remove(taskId); } } + if (isDynamicAllocationOngoing.get()) { + checkpointsToWaitFor -= setEndOffsetFutures.size(); + if (checkpointsToWaitFor <= 0) { Review Comment: When can this be < 0? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java: ########## @@ -301,6 +340,50 @@ public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(Str ); } + @Override + public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAndCheckpointAsync(String id) + { + final ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseFuture = + makeRequest(id, new RequestBuilder(HttpMethod.POST, "/pauseAndCheckpoint")) + .handler(new BytesFullResponseHandler()) + .onSuccess(r -> { + if (r.getStatus().equals(HttpResponseStatus.OK)) { + log.info("Task [%s] paused successfully & Checkpoint requested successffully", id); + return deserializeOffsetsMap(r.getContent()); + } else if (r.getStatus().equals(HttpResponseStatus.ACCEPTED)) { + return null; Review Comment: Do we expect to see this kind of response? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -452,9 +460,10 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception //milliseconds waited for created segments to be handed off long handoffWaitMs = 0L; - + log.info("Task perpetually running: %s", task.isPerpetuallyRunning()); Review Comment: nit: add task ID to this log (unless thread ID is there). ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -571,29 +599,306 @@ private boolean changeTaskCount(int desiredActiveTaskCount) dataSource ); final Stopwatch scaleActionStopwatch = Stopwatch.createStarted(); - gracefulShutdownInternal(); - changeTaskCountInIOConfig(desiredActiveTaskCount); - clearAllocationInfo(); - emitter.emit(ServiceMetricEvent.builder() - .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()) - .setDimensionIfNotNull( - DruidMetrics.TAGS, - spec.getContextValue(DruidMetrics.TAGS) - ) - .setMetric( - AUTOSCALER_SCALING_TIME_METRIC, - scaleActionStopwatch.millisElapsed() - )); + + if (spec.usePersistentTasks()) { + return changeTaskCountForPerpetualTasks(desiredActiveTaskCount, successfulScaleAutoScalerCallback); + } else { + gracefulShutdownInternal(); + changeTaskCountInIOConfig(desiredActiveTaskCount); + clearAllocationInfo(); + } + emitAutoScalerRunMetric(scaleActionStopwatch); log.info("Changed taskCount to [%s] for supervisor[%s] for dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource); return true; } } + private void emitAutoScalerRunMetric(Stopwatch scaleActionStopwatch) + { + emitter.emit(ServiceMetricEvent.builder() + .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()) + .setDimensionIfNotNull( + DruidMetrics.TAGS, + spec.getContextValue(DruidMetrics.TAGS) + ) + .setMetric( + AUTOSCALER_SCALING_TIME_METRIC, + scaleActionStopwatch.millisElapsed() + )); + } + + /** + * Handles task count changes for perpetual tasks using updateConfig instead of graceful shutdown. + * This approach pauses tasks, recalculates partition assignments, and sends config updates. + */ + private boolean changeTaskCountForPerpetualTasks(int desiredActiveTaskCount, + Runnable successfulScaleAutoScalerCallback + ) + throws InterruptedException, ExecutionException + { + if (!isDynamicAllocationOngoing.compareAndSet(false, true)) { + log.info("A dynamic allocation is already ongoing, skipping this request."); + return false; + } + final Stopwatch scaleActionStopwatch = Stopwatch.createStarted(); + log.info("Handling task count change for perpetual tasks from [%d] to [%d]", + activelyReadingTaskGroups.size(), desiredActiveTaskCount); + + Map<PartitionIdType, SequenceOffsetType> offsetsFromTasks = pauseAndCheckpointAllTasks(); + + if (offsetsFromTasks.isEmpty()) { + isDynamicAllocationOngoing.set(false); + return false; + } + pendingConfigUpdateHook = () -> updateTaskConfigsAndCompleteAutoScaleEvent( + offsetsFromTasks, + scaleActionStopwatch, + desiredActiveTaskCount, + successfulScaleAutoScalerCallback + ); + return true; + } + + /** + * This function sends the config updates to all the tasks with new partition assignments and offsets. In an auto-scaling + * lifecycle, this marks the end of the scale action. + */ + private boolean updateTaskConfigsAndCompleteAutoScaleEvent( + Map<PartitionIdType, SequenceOffsetType> offsetsFromTasks, + Stopwatch scaleActionStopwatch, + int desiredActiveTaskCount, + Runnable successfulScaleAutoScalerCallback + ) throws InterruptedException, ExecutionException + { + changeTaskCountInIOConfig(desiredActiveTaskCount); + Map<Integer, Set<PartitionIdType>> newPartitionGroups = calculateNewPartitionGroups(); + + boolean success = sendConfigUpdatesToTasks(newPartitionGroups, offsetsFromTasks); + + if (success) { + updatePartitionGroupsForPerpetualTasks(newPartitionGroups); + + log.info("Successfully updated task configurations for perpetual tasks scaling"); + } else { + log.error("Failed to update task configurations for perpetual tasks"); + } + + isDynamicAllocationOngoing.set(false); + emitAutoScalerRunMetric(scaleActionStopwatch); + + // You need to set the auto scaler specific stuff here. + successfulScaleAutoScalerCallback.run(); + dynamicTriggerLastRunTime = System.currentTimeMillis(); + return success; + } + + + private Map<PartitionIdType, SequenceOffsetType> pauseAndCheckpointAllTasks() throws InterruptedException, ExecutionException + { + log.info("Pausing all tasks for perpetual task scaling"); + List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> pauseFutures = new ArrayList<>(); + + for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { + for (String taskId : taskGroup.taskIds()) { + log.debug("Pausing & Checkpointing tasks [%s]", taskId); Review Comment: nit: task ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1895,6 +2060,32 @@ public Response pauseHTTP( return pause(); } + @POST + @Path("/pauseAndCheckpoint") + @Produces(MediaType.APPLICATION_JSON) + public Response pauseAndCheckpointHTTP( + @Context final HttpServletRequest req + ) throws InterruptedException + { + authorizationCheck(req); + if (!waitForConfigUpdate.compareAndSet(false, true)) { + return Response.ok().entity("Task is already paused for checkpoint completion").build(); + } + Response pauseResponse = pause(); + if (pauseResponse.getStatus() == 409) { + waitForConfigUpdate.set(false); + return pauseResponse; Review Comment: possibly naive: do we need to resume() here? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1895,6 +2060,32 @@ public Response pauseHTTP( return pause(); } + @POST + @Path("/pauseAndCheckpoint") + @Produces(MediaType.APPLICATION_JSON) + public Response pauseAndCheckpointHTTP( + @Context final HttpServletRequest req + ) throws InterruptedException + { + authorizationCheck(req); + if (!waitForConfigUpdate.compareAndSet(false, true)) { + return Response.ok().entity("Task is already paused for checkpoint completion").build(); + } + Response pauseResponse = pause(); + if (pauseResponse.getStatus() == 409) { Review Comment: nit: ```java .getStatus().equals(HttpResponseStatus.CONFLICT) ``` ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -226,6 +226,70 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ); } + @Override + protected SeekableStreamIndexTaskIOConfig<KafkaTopicPartition, Long> createUpdatedTaskIoConfig( + Set<KafkaTopicPartition> partitions, + TaskGroup existingTaskGroup, + Map<KafkaTopicPartition, Long> latestCommittedOffsets, + Map<KafkaTopicPartition, Long> latestTaskOffsetsOnPause + ) + { + log.info("Creating updated task IO config for task group [%s]", existingTaskGroup.getId()); + Map<KafkaTopicPartition, Long> startingSequences = new HashMap<>(); + Set<KafkaTopicPartition> exclusiveStartSequenceNumberPartitions = new HashSet<>(); + + for (KafkaTopicPartition partition : partitions) { + Long offset = Math.max( + latestTaskOffsetsOnPause.getOrDefault(partition, 0L), + latestCommittedOffsets.getOrDefault(partition, 0L) + ); + startingSequences.put(partition, offset); + } + + SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long> startSequenceNumbers = + new SeekableStreamStartSequenceNumbers<>( + spec.getSpec().getIOConfig().getStream(), + startingSequences, + exclusiveStartSequenceNumberPartitions + ); + + // For end sequences, use NOT_SET to indicate open-ended reading + Map<KafkaTopicPartition, Long> endingSequences = new HashMap<>(); + for (KafkaTopicPartition partition : partitions) { + endingSequences.put(partition, END_OF_PARTITION); Review Comment: END_OF_PARTITION or NOT_SET? Comment is a bit misleading -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
