kfaraz commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2386855941
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,10 +1738,161 @@ public Response getUnparseableEvents(
return
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
}
+ @POST
+ @Path("/config")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response updateConfig(
+ TaskConfigUpdateRequest<PartitionIdType, SequenceOffsetType> request,
+ @Context final HttpServletRequest req
+ )
+ {
+ authorizationCheck(req);
+ if (!waitForConfigUpdate.get()) {
+ return Response.status(409).entity("Task must have been paused and
checkpointed before updating config.").build();
+ }
+ try {
+ return updateTaskRunnerConfig(request);
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to update task config");
+ waitForConfigUpdate.set(false);
+ return Response.serverError().entity(e.getMessage()).build();
+ }
+ }
+
+ /**
+ * Updates the task's ioConfig, creates a new sequence from the new
ioConfig, re-assigns partitions and seeks to
+ * the new starting offsets. If there is no partition assigned to this task
due to a scale down, the task will be
+ * paused.
+ */
+ private Response
updateTaskRunnerConfig(TaskConfigUpdateRequest<PartitionIdType,
SequenceOffsetType> request)
+ throws IOException, InterruptedException
+ {
+ log.info("Attempting to update config to [%s]", request.getIoConfig());
+ SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType>
newIoConfig = request.getIoConfig();
+ setIOConfig(newIoConfig);
+ createNewSequenceFromIoConfig(newIoConfig, request.getLastOffsets());
+ supervisorSpecVersion = request.getSupervisorSpecVersion();
+
+ assignment = assignPartitions(recordSupplier);
+ if (!assignment.isEmpty()) {
+ possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment);
+ seekToStartingSequence(recordSupplier, assignment);
+ resume();
+ }
+
+ log.info("Config updated to [%s]", this.ioConfig);
+ toolbox.getEmitter().emit(ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.TASK_ID,
task.getId())
+ .setDimension(DruidMetrics.TASK_TYPE,
task.getType())
+ .setDimension(DruidMetrics.DATASOURCE,
task.getDataSource())
+ .setMetric("task/config/update/success", 1)
+ .build(ImmutableMap.of()));
+ waitForConfigUpdate.set(false);
+ return Response.ok().build();
+ }
+
+ private void setIOConfig(
+ SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType>
ioConfig
+ )
+ {
+ this.ioConfig = ioConfig;
+ this.endOffsets = new
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
+ this.minMessageTime =
Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN);
+ this.maxMessageTime =
Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX);
+ }
+
+ /**
+ * Creates new sequences for the ingestion process. It currently accepts the
ioConfig given by the request as the correct offsets
+ * and ignores the offsets it may have stored in currOffsets and endOffsets.
+ */
+ private void
createNewSequenceFromIoConfig(SeekableStreamIndexTaskIOConfig<PartitionIdType,
SequenceOffsetType> ioConfig,
+ Map<PartitionIdType,
SequenceOffsetType> latestCommittedOffsets
+ )
+ throws IOException
+ {
+ Map<PartitionIdType, SequenceOffsetType> partitionStartOffsets =
ioConfig.getStartSequenceNumbers()
+
.getPartitionSequenceNumberMap();
+ Map<PartitionIdType, SequenceOffsetType> partitionEndSequences =
ioConfig.getEndSequenceNumbers()
+
.getPartitionSequenceNumberMap();
+ SequenceMetadata<PartitionIdType, SequenceOffsetType> lastSequenceMetadata
= getLastSequenceMetadata();
+ Map<PartitionIdType, SequenceOffsetType> offsetsForLastPartitionAssignment
= latestCommittedOffsets.entrySet()
+
.stream()
+
.filter(e ->
lastSequenceMetadata.startOffsets.containsKey(e.getKey()))
+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ if
(lastSequenceMetadata.startOffsets.equals(offsetsForLastPartitionAssignment)) {
+ // This is the case where no data has been ingested since the last
successfull checkpoint for these partitions.
+ // In this case, we'll mark the end offsets for this sequence same as
start offsets and create a new sequence.
+ log.info("No new data ingested across any sibling tasks for this
partition sequence since last checkpoint.");
+ lastSequenceMetadata.setEndOffsets(lastSequenceMetadata.startOffsets);
+ }
+
+ final Set<PartitionIdType> exclusiveStartPartitions =
computeExclusiveStartPartitionsForSequence(
+ partitionStartOffsets);
+
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType> newSequence =
new SequenceMetadata<>(
+ sequences.isEmpty() ? 0 : lastSequenceMetadata.getSequenceId() + 1,
+ StringUtils.format(
+ "%s_%d",
+ ioConfig.getBaseSequenceName(),
+ sequences.isEmpty() ? 0 : lastSequenceMetadata.getSequenceId() + 1
+ ),
+ partitionStartOffsets,
+ partitionEndSequences,
+ false,
+ exclusiveStartPartitions,
+ getTaskLockType()
+ );
+ log.info("Attempting adding new sequence [%s]", newSequence);
+
+ currOffsets.clear();
+ currOffsets.putAll(partitionStartOffsets);
+ endOffsets.clear();
+ endOffsets.putAll(partitionEndSequences);
+
+ addSequence(newSequence);
+ persistSequences();
+ log.info(
+ "Created new sequence [%s] with start offsets [%s]",
+ newSequence.getSequenceName(), partitionStartOffsets
+ );
+ }
+
+ private void checkpointSequences()
+ {
+ try {
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
latestSequence = getLastSequenceMetadata();
+ if (!latestSequence.isCheckpointed()) {
+ final CheckPointDataSourceMetadataAction checkpointAction = new
CheckPointDataSourceMetadataAction(
+ getSupervisorId(),
+ ioConfig.getTaskGroupId(),
+ null,
+ createDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ latestSequence.getStartOffsets(),
+ latestSequence.getExclusiveStartPartitions()
+ )
+ )
+ );
+ if (!toolbox.getTaskActionClient().submit(checkpointAction)) {
+ throw new ISE("Checkpoint request with sequences [%s] failed,
dying", currOffsets);
+ }
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Failed to checkpoint sequences.");
+ backgroundThreadException = e;
+ }
+ }
+
@VisibleForTesting
public Response setEndOffsets(
Map<PartitionIdType, SequenceOffsetType> sequenceNumbers,
- boolean finish // this field is only for internal purposes, shouldn't be
usually set by users
+ boolean finish, // this field is only for internal purposes, shouldn't
be usually set by users
Review Comment:
This old comment does not add any value. Please remove it.
```suggestion
boolean finish,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]