uds5501 commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2371094911
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -848,66 +857,15 @@ public void onFailure(Throwable t)
// We need to copy sequences here, because the success callback in
publishAndRegisterHandoff removes items from
// the sequence list. If a publish finishes before we finish iterating
through the sequence list, we can
// end up skipping some sequences.
- List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
sequencesSnapshot = new ArrayList<>(sequences);
- for (int i = 0; i < sequencesSnapshot.size(); i++) {
- final SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceMetadata = sequencesSnapshot.get(i);
- if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
- &&
!publishedSequences.contains(sequenceMetadata.getSequenceName())) {
- final boolean isLast = i == (sequencesSnapshot.size() - 1);
- if (isLast) {
- // Shorten endOffsets of the last sequence to match currOffsets.
- sequenceMetadata.setEndOffsets(currOffsets);
- }
-
- // Update assignments of the sequence, which should clear them.
(This will be checked later, when the
- // Committer is built.)
- sequenceMetadata.updateAssignments(currOffsets,
this::isMoreToReadAfterReadingRecord);
- publishingSequences.add(sequenceMetadata.getSequenceName());
- // persist already done in finally, so directly add to publishQueue
- publishAndRegisterHandoff(sequenceMetadata);
- }
- }
+ populateSequencesToPublish();
if (backgroundThreadException != null) {
throw new RuntimeException(backgroundThreadException);
}
- // Wait for publish futures to complete.
- Futures.allAsList(publishWaitList).get();
-
- // Wait for handoff futures to complete.
- // Note that every publishing task (created by calling
AppenderatorDriver.publish()) has a corresponding
- // handoffFuture. handoffFuture can throw an exception if 1) the
corresponding publishFuture failed or 2) it
- // failed to persist sequences. It might also return null if handoff
failed, but was recoverable.
- // See publishAndRegisterHandoff() for details.
- List<SegmentsAndCommitMetadata> handedOffList = Collections.emptyList();
- ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
- if (tuningConfig.getHandoffConditionTimeout() == 0) {
- handedOffList = Futures.allAsList(handOffWaitList).get();
- } else {
- final long start = System.nanoTime();
- try {
- handedOffList = Futures.allAsList(handOffWaitList)
-
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
- }
- catch (TimeoutException e) {
- // Handoff timeout is not an indexing failure, but coordination
failure. We simply ignore timeout exception
- // here.
- log.makeAlert("Timeout waiting for handoff")
- .addData("taskId", task.getId())
- .addData("handoffConditionTimeout",
tuningConfig.getHandoffConditionTimeout())
- .emit();
- }
- finally {
- handoffWaitMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
start);
- }
- }
-
- for (SegmentsAndCommitMetadata handedOff : handedOffList) {
- log.info(
- "Handoff complete for segments: %s",
- String.join(", ", Lists.transform(handedOff.getSegments(),
DataSegment::toString))
- );
Review Comment:
ack, it was just moved.
re-moving to reduce the diff and undo the refactor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]