aho135 commented on code in PR #19477:
URL: https://github.com/apache/druid/pull/19477#discussion_r3321269264
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +374,120 @@ public boolean resetSupervisor(String id, @Nullable
DataSourceMetadata resetData
return true;
}
+ /**
+ * Resets a supervisor to the latest stream offsets and starts a bounded
backfill supervisor to
+ * process the skipped range from the previously checkpointed offsets up to
the latest offsets.
+ *
+ * @param id supervisor ID
+ * @param backfillTaskCount number of tasks for the backfill supervisor, or
null to inherit from the source spec
+ * @return map with {@code "id"} (the original supervisor ID) and {@code
"backfillSupervisorId"}
+ * @throws IllegalArgumentException if the supervisor is not a {@link
SeekableStreamSupervisor},
+ * if {@code useEarliestSequenceNumber} is
true,
+ * if {@code useConcurrentLocks} is not set
to true in the supervisor context,
+ * or if the supervisor is not in a RUNNING
state
+ * @throws IllegalStateException if the latest or checkpointed offsets
cannot be retrieved,
+ * or if the backfill spec cannot be
serialized
+ */
+ public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable
Integer backfillTaskCount)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ Preconditions.checkNotNull(id, "id");
+
+ Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+ validateResetAndBackfill(id, supervisorPair);
+
+ SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor)
supervisorPair.lhs;
+ SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec)
supervisorPair.rhs;
+
+ log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+ streamSupervisor.updatePartitionLagFromStream();
+ Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+ log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+ Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get latest offsets from stream
for supervisor[%s]", id);
+ }
+ if (startOffsets == null || startOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get checkpointed offsets for
supervisor[%s]", id);
+ }
+
+ String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id +
"_backfill");
+
+ try {
+ Map<String, Object> normalizedStartOffsets =
jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
+ Map<String, Object> normalizedEndOffsets =
jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
+ BoundedStreamConfig boundedStreamConfig = new
BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
+ SupervisorSpec backfillSpec = createBackfillSpec(streamSpec,
backfillSupervisorId, boundedStreamConfig, backfillTaskCount);
+ createOrUpdateAndStartSupervisor(backfillSpec);
+ }
+ catch (JsonProcessingException e) {
+ throw new ISE(e, "Failed to create backfill supervisor spec for
supervisor[%s]", id);
+ }
+
+ log.info("Started backfill supervisor[%s] for supervisor[%s]",
backfillSupervisorId, id);
+
+ log.info("Resetting supervisor[%s] metadata to latest offsets", id);
+ DataSourceMetadata resetMetadata =
streamSupervisor.createDataSourceMetaDataForReset(
+ streamSupervisor.getIoConfig().getStream(),
+ endOffsets
+ );
+
+ streamSupervisor.resetOffsets(resetMetadata);
+
+ // Reset autoscaler if present
+ SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+ if (autoscaler != null) {
+ autoscaler.reset();
+ }
+
+ return ImmutableMap.of(
+ "id", id,
+ "backfillSupervisorId", backfillSupervisorId
+ );
+ }
+
+ private void validateResetAndBackfill(String id, Pair<Supervisor,
SupervisorSpec> supervisorPair)
+ {
+ if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
Review Comment:
On second thought, I'm thinking to leave this as is since in this case we'd
be discarding the returned value and then exception handling differs slightly
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +374,120 @@ public boolean resetSupervisor(String id, @Nullable
DataSourceMetadata resetData
return true;
}
+ /**
+ * Resets a supervisor to the latest stream offsets and starts a bounded
backfill supervisor to
+ * process the skipped range from the previously checkpointed offsets up to
the latest offsets.
+ *
+ * @param id supervisor ID
+ * @param backfillTaskCount number of tasks for the backfill supervisor, or
null to inherit from the source spec
+ * @return map with {@code "id"} (the original supervisor ID) and {@code
"backfillSupervisorId"}
+ * @throws IllegalArgumentException if the supervisor is not a {@link
SeekableStreamSupervisor},
+ * if {@code useEarliestSequenceNumber} is
true,
+ * if {@code useConcurrentLocks} is not set
to true in the supervisor context,
+ * or if the supervisor is not in a RUNNING
state
+ * @throws IllegalStateException if the latest or checkpointed offsets
cannot be retrieved,
+ * or if the backfill spec cannot be
serialized
+ */
+ public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable
Integer backfillTaskCount)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ Preconditions.checkNotNull(id, "id");
+
+ Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+ validateResetAndBackfill(id, supervisorPair);
+
+ SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor)
supervisorPair.lhs;
+ SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec)
supervisorPair.rhs;
+
+ log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+ streamSupervisor.updatePartitionLagFromStream();
+ Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+ log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+ Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get latest offsets from stream
for supervisor[%s]", id);
+ }
+ if (startOffsets == null || startOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get checkpointed offsets for
supervisor[%s]", id);
+ }
+
+ String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id +
"_backfill");
+
+ try {
+ Map<String, Object> normalizedStartOffsets =
jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
+ Map<String, Object> normalizedEndOffsets =
jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
+ BoundedStreamConfig boundedStreamConfig = new
BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
+ SupervisorSpec backfillSpec = createBackfillSpec(streamSpec,
backfillSupervisorId, boundedStreamConfig, backfillTaskCount);
+ createOrUpdateAndStartSupervisor(backfillSpec);
+ }
+ catch (JsonProcessingException e) {
+ throw new ISE(e, "Failed to create backfill supervisor spec for
supervisor[%s]", id);
+ }
+
+ log.info("Started backfill supervisor[%s] for supervisor[%s]",
backfillSupervisorId, id);
+
+ log.info("Resetting supervisor[%s] metadata to latest offsets", id);
+ DataSourceMetadata resetMetadata =
streamSupervisor.createDataSourceMetaDataForReset(
+ streamSupervisor.getIoConfig().getStream(),
+ endOffsets
+ );
+
+ streamSupervisor.resetOffsets(resetMetadata);
+
+ // Reset autoscaler if present
+ SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+ if (autoscaler != null) {
+ autoscaler.reset();
+ }
+
+ return ImmutableMap.of(
+ "id", id,
+ "backfillSupervisorId", backfillSupervisorId
+ );
+ }
+
+ private void validateResetAndBackfill(String id, Pair<Supervisor,
SupervisorSpec> supervisorPair)
+ {
+ if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
Review Comment:
On second thought, I'm thinking to leave this as is since in this case we'd
be discarding the returned value and the exception handling differs slightly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]