abhishekagarwal87 commented on code in PR #15039:
URL: https://github.com/apache/druid/pull/15039#discussion_r1352266424
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -111,24 +114,42 @@ public SegmentPublishResult perform(Task task,
TaskActionToolbox toolbox)
throw new RuntimeException(e);
}
- // Emit metrics
- final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
- IndexTaskUtils.setTaskDimensions(metricBuilder, task);
-
- if (retVal.isSuccess()) {
- toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success",
1));
+ IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox);
- for (DataSegment segment : retVal.getSegments()) {
- final String partitionType = segment.getShardSpec() == null ? null :
segment.getShardSpec().getType();
- metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE,
partitionType);
- metricBuilder.setDimension(DruidMetrics.INTERVAL,
segment.getInterval().toString());
-
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes",
segment.getSize()));
+ final Set<String> activeSupervisorIds = new HashSet<>();
+ if (toolbox.getSupervisorManager() != null) {
+ activeSupervisorIds.addAll(
+
toolbox.getSupervisorManager().getSeekableStreamSupervisorIdsForDatasource(task.getDataSource())
+ );
+ }
+ if (publishResult.isSuccess() && !activeSupervisorIds.isEmpty()) {
+ // If upgrade of pending segments fails, the segments will still get
upgraded
+ // when the corresponding APPEND task commits the segments.
+ // Thus, the upgrade of pending segments should not be done in the same
+ // transaction as the commit of replace segments and failure to upgrade
+ // pending segments should not affect success of replace commit.
+ try {
+ Set<SegmentIdWithShardSpec> upgradedPendingSegments =
+
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegments(segments);
+ log.info(
+ "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
+ upgradedPendingSegments.size(), task.getId(),
upgradedPendingSegments
+ );
+
+ for (String supervisorId : activeSupervisorIds) {
+ for (SegmentIdWithShardSpec pendingSegment :
upgradedPendingSegments) {
+
toolbox.getSupervisorManager().updatePendingSegmentMapping(supervisorId,
pendingSegment);
+ }
+ }
+
+ // These upgraded pending segments should be forwarded to the
SupervisorManager
Review Comment:
leftover comment?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1092,6 +1093,31 @@ public void resetOffsets(@Nonnull DataSourceMetadata
resetDataSourceMetadata)
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}
+ @Override
+ public void updatePendingSegmentMapping(SegmentIdWithShardSpec
rootPendingSegment)
+ {
+ for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
+ for (String taskId : taskGroup.taskIds()) {
+ taskClient.updatePendingSegmentMapping(
+ taskId,
+ rootPendingSegment,
+
indexerMetadataStorageCoordinator.findAllVersionsOfPendingSegment(rootPendingSegment)
+ );
Review Comment:
We are just ignoring the future here. The method here should return a future
combined to the caller.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1092,6 +1093,31 @@ public void resetOffsets(@Nonnull DataSourceMetadata
resetDataSourceMetadata)
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}
+ @Override
+ public void updatePendingSegmentMapping(SegmentIdWithShardSpec
rootPendingSegment)
+ {
+ for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
+ for (String taskId : taskGroup.taskIds()) {
+ taskClient.updatePendingSegmentMapping(
+ taskId,
+ rootPendingSegment,
+
indexerMetadataStorageCoordinator.findAllVersionsOfPendingSegment(rootPendingSegment)
+ );
+ }
+ }
+ for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values())
{
+ for (TaskGroup taskGroup : taskGroupList) {
+ for (String taskId : taskGroup.taskIds()) {
+ taskClient.updatePendingSegmentMapping(
+ taskId,
+ rootPendingSegment,
+
indexerMetadataStorageCoordinator.findAllVersionsOfPendingSegment(rootPendingSegment)
Review Comment:
this is a db call thats being called again and again for each task.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -92,6 +95,8 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
+ private final ConcurrentMap<SegmentDescriptor, SegmentDescriptor>
newIdToRootPendingSegment
Review Comment:
when is an entry removed from the map?
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -601,6 +599,157 @@ public SegmentIdWithShardSpec allocatePendingSegment(
);
}
+ @Override
+ public Set<SegmentIdWithShardSpec> upgradePendingSegments(Set<DataSegment>
replaceSegments)
+ {
+ if (replaceSegments.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ // Any replace interval has exactly one version of segments
+ final Map<Interval, DataSegment> replaceIntervalToMaxId = new HashMap<>();
+ for (DataSegment segment : replaceSegments) {
+ DataSegment committedMaxId =
replaceIntervalToMaxId.get(segment.getInterval());
+ if (committedMaxId == null
+ || committedMaxId.getShardSpec().getPartitionNum() <
segment.getShardSpec().getPartitionNum()) {
+ replaceIntervalToMaxId.put(segment.getInterval(), segment);
+ }
+ }
+
+ final String datasource =
replaceSegments.iterator().next().getDataSource();
+ return connector.retryWithHandle(
+ handle -> upgradePendingSegments(handle, datasource,
replaceIntervalToMaxId)
+ );
+ }
+
+ @Override
+ public Set<SegmentIdWithShardSpec>
findAllVersionsOfPendingSegment(SegmentIdWithShardSpec pendingSegment)
+ {
+ return connector.retryWithHandle(
+ handle -> findAllVersionsOfPendingSegment(handle, pendingSegment)
+ );
+ }
+
+ private Set<SegmentIdWithShardSpec> findAllVersionsOfPendingSegment(
+ Handle handle,
+ SegmentIdWithShardSpec pendingSegment
+ ) throws IOException
+ {
+ final Interval interval = pendingSegment.getInterval();
+ final Query<Map<String, Object>> query = handle
+ .createQuery(
+ StringUtils.format(
+ "SELECT payload "
+ + "FROM %s WHERE "
+ + "dataSource = :dataSource AND "
+ + "start = :start AND "
+ + "%2$send%2$s = :end AND "
+ + "sequence_prev_id = :sequence_prev_id",
+ dbTables.getPendingSegmentsTable(),
+ connector.getQuoteString()
+ )
+ )
+ .bind("dataSource", pendingSegment.getDataSource())
+ .bind("sequence_prev_id", pendingSegment.asSegmentId().toString())
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString());
+
+ final ResultIterator<byte[]> dbSegments = query
+ .map(ByteArrayMapper.FIRST)
+ .iterator();
+
+ final Set<SegmentIdWithShardSpec> allVersions = new HashSet<>();
+ while (dbSegments.hasNext()) {
+ final byte[] payload = dbSegments.next();
+ final SegmentIdWithShardSpec segmentId =
+ jsonMapper.readValue(payload, SegmentIdWithShardSpec.class);
+ allVersions.add(segmentId);
+ }
+
+ return allVersions;
+ }
+
+ /**
+ * Finds pending segments contained in each replace interval and upgrades
them
+ * to the replace version.
+ */
+ private Set<SegmentIdWithShardSpec> upgradePendingSegments(
+ Handle handle,
+ String datasource,
+ Map<Interval, DataSegment> replaceIntervalToMaxId
+ ) throws IOException
+ {
+ final Map<SegmentCreateRequest, SegmentIdWithShardSpec>
newPendingSegmentVersions = new HashMap<>();
+
+ for (Map.Entry<Interval, DataSegment> entry :
replaceIntervalToMaxId.entrySet()) {
+ final Interval replaceInterval = entry.getKey();
+ final DataSegment maxSegmentId = entry.getValue();
+ final String replaceVersion = maxSegmentId.getVersion();
+
+ final int numCorePartitions =
maxSegmentId.getShardSpec().getNumCorePartitions();
+ int currentPartitionNumber =
maxSegmentId.getShardSpec().getPartitionNum();
+
+ final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
+ = getPendingSegmentsForIntervalWithHandle(handle, datasource,
replaceInterval);
+
+ for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
+ : overlappingPendingSegments.entrySet()) {
+ final SegmentIdWithShardSpec pendingSegmentId =
overlappingPendingSegment.getKey();
+ final String pendingSegmentSequence =
overlappingPendingSegment.getValue();
+ if (shouldUpgradePendingSegment(pendingSegmentId,
pendingSegmentSequence, replaceInterval, replaceVersion)) {
+ // There cannot be any duplicates because this version not been
committed before
+ newPendingSegmentVersions.put(
+ new SegmentCreateRequest(
+ UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion,
Review Comment:
should it not be
```suggestion
UPGRADED_PENDING_SEGMENT_PREFIX + pendingSegmentSequence,
```
I guess it doesn't matter.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -69,6 +72,24 @@ public Set<String> getSupervisorIds()
return supervisors.keySet();
}
+ public Set<String> getSeekableStreamSupervisorIdsForDatasource(String
datasource)
Review Comment:
why does it return a set?
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java:
##########
@@ -771,6 +774,90 @@ public void testMultipleGranularities()
verifyIntervalHasVisibleSegments(YEAR_23, segmentV10, segmentV11,
segmentV13);
}
+ @Test
+ public void testSegmentIsAllocatedAtLatestVersion()
Review Comment:
this doesn't verify that we are creating extra pending segment versions. am
I missing something?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java:
##########
@@ -73,7 +78,8 @@ public SequenceMetadata(
@JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType>
startOffsets,
@JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType>
endOffsets,
@JsonProperty("checkpointed") boolean checkpointed,
- @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType>
exclusiveStartPartitions
+ @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType>
exclusiveStartPartitions,
+ @JsonProperty("taskLockType") TaskLockType taskLockType
Review Comment:
would adding this cause any issues during upgrade/downgrade? This should be
nullable.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1092,6 +1093,31 @@ public void resetOffsets(@Nonnull DataSourceMetadata
resetDataSourceMetadata)
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}
+ @Override
+ public void updatePendingSegmentMapping(SegmentIdWithShardSpec
rootPendingSegment)
+ {
+ for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
+ for (String taskId : taskGroup.taskIds()) {
+ taskClient.updatePendingSegmentMapping(
+ taskId,
+ rootPendingSegment,
+
indexerMetadataStorageCoordinator.findAllVersionsOfPendingSegment(rootPendingSegment)
+ );
+ }
+ }
+ for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values())
{
+ for (TaskGroup taskGroup : taskGroupList) {
+ for (String taskId : taskGroup.taskIds()) {
+ taskClient.updatePendingSegmentMapping(
+ taskId,
+ rootPendingSegment,
+
indexerMetadataStorageCoordinator.findAllVersionsOfPendingSegment(rootPendingSegment)
Review Comment:
It would also make sense to pass this from the action itself to the
supervisor.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1026,6 +1031,62 @@ public void closeNow()
}
}
+ private void unannounceRootSegmentAndUpgradedVersions(Sink sink) throws
IOException
+ {
+ final DataSegment rootSegment = sink.getSegment();
+ segmentAnnouncer.unannounceSegment(rootSegment);
+ if (!rootPendingSegmentToNewerVersions.containsKey(rootSegment.getId())) {
+ return;
+ }
+ for (SegmentIdWithShardSpec newId :
rootPendingSegmentToNewerVersions.get(rootSegment.getId())) {
+ final DataSegment newSegment = new DataSegment(
+ newId.getDataSource(),
+ newId.getInterval(),
+ newId.getVersion(),
+ rootSegment.getLoadSpec(),
+ rootSegment.getDimensions(),
+ rootSegment.getMetrics(),
+ newId.getShardSpec(),
+ rootSegment.getBinaryVersion(),
+ rootSegment.getSize()
+ );
+ segmentAnnouncer.unannounceSegment(newSegment);
+ }
+ rootPendingSegmentToNewerVersions.remove(rootSegment.getId());
Review Comment:
we should remove this in finally.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -153,6 +155,12 @@ ListenableFuture<Boolean> setEndOffsetsAsync(
*/
ListenableFuture<SeekableStreamIndexTaskRunner.Status> getStatusAsync(String
id);
+ ListenableFuture<Boolean> updatePendingSegmentMapping(
Review Comment:
we should also rename this method to `updatePendingSegmentMappinAsync` in
line with other methods in this class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]