cryptoe commented on code in PR #14985:
URL: https://github.com/apache/druid/pull/14985#discussion_r1377449052
##########
docs/api-reference/legacy-metadata-api.md:
##########
@@ -116,10 +116,18 @@ Returns a list of all segments for one or more specific
datasources enabled in t
Returns a list of all segments for each datasource with the full segment
metadata and an extra field `overshadowed`.
+`GET
/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments`
Review Comment:
Lets document that we need this feature to be enabled to use this param
`includeRealtimeSegments`
##########
server/src/main/java/org/apache/druid/server/http/MetadataResource.java:
##########
@@ -180,26 +196,67 @@ private Response getAllUsedSegmentsWithAdditionalDetails(
.filter(dataSourceWithUsedSegments ->
dataSources.contains(dataSourceWithUsedSegments.getName()))
.collect(Collectors.toList());
}
- final Stream<DataSegment> usedSegments = dataSourcesWithUsedSegments
- .stream()
- .flatMap(t -> t.getSegments().stream());
final Set<DataSegment> overshadowedSegments =
dataSourcesSnapshot.getOvershadowedSegments();
+ final Set<SegmentId> segmentAlreadySeen = new HashSet<>();
+ final Stream<SegmentStatusInCluster> segmentStatus =
dataSourcesWithUsedSegments
+ .stream()
+ .flatMap(t -> t.getSegments().stream())
+ .map(segment -> {
+ // The replication factor for unloaded segments is 0 as they will be
unloaded soon
+ boolean isOvershadowed = overshadowedSegments.contains(segment);
+ Integer replicationFactor = isOvershadowed ? (Integer) 0
+ :
coordinator.getReplicationFactor(segment.getId());
+
+ Long numRows = null;
+ if (null != coordinatorSegmentMetadataCache) {
+ AvailableSegmentMetadata availableSegmentMetadata =
coordinatorSegmentMetadataCache.getAvailableSegmentMetadata(
+ segment.getDataSource(),
+ segment.getId()
+ );
+ if (null != availableSegmentMetadata) {
+ numRows = availableSegmentMetadata.getNumRows();
+ }
+ }
+ segmentAlreadySeen.add(segment.getId());
+ return new SegmentStatusInCluster(
+ segment,
+ isOvershadowed,
+ replicationFactor,
+ numRows,
+ // published segment can't be realtime
+ false
+ );
+ });
+
+ Stream<SegmentStatusInCluster> finalSegments = segmentStatus;
- final Stream<SegmentStatusInCluster> segmentStatus =
usedSegments.map(segment -> {
- // The replication factor for unloaded segments is 0 as they will be
unloaded soon
- boolean isOvershadowed = overshadowedSegments.contains(segment);
- Integer replicationFactor = isOvershadowed ? (Integer) 0
- :
coordinator.getReplicationFactor(segment.getId());
+ // conditionally add realtime segments information
+ if (null != includeRealtimeSegments && null !=
coordinatorSegmentMetadataCache) {
Review Comment:
Since the realtime flag only works if the `centralizedSchemaManagement`is
enabled lets document this in user facing docs and well as in the code.
In the code we should mention that we did not want to increase the payload
of the broker to coordinator communication if this feature is disabled and we
did not want to introduce a new feature flag on the broker so this was a trade
off done. This api will be deprecated once we fully move over to coordinator to
power sys.segments q's from the broker.
##########
server/src/main/java/org/apache/druid/client/SegmentLoadInfo.java:
##########
@@ -59,6 +61,15 @@ public ImmutableSegmentLoadInfo toImmutableSegmentLoadInfo()
return new ImmutableSegmentLoadInfo(segment, servers);
}
+ /**
+ * Randomly return one server from the sets of {@code servers}
+ */
+ public DruidServerMetadata pickOne()
+ {
+ synchronized (this) {
Review Comment:
Yes I kind of agree that we might hit 'IndexOutOfBoundsExceptions`
From concurrentHashMap java docs:
```
However, iterators are designed to be used by only one thread at a time.
* Bear in mind that the results of aggregate status methods including
* {@code size}, {@code isEmpty}, and {@code containsValue} are typically
* useful only when a map is not undergoing concurrent updates in other
threads.
* Otherwise the results of these methods reflect transient states
* that may be adequate for monitoring or estimation purposes, but not
* for program control.
```
but since you can iterator the elements safely what you could do is
* get the position of the element that you want to compute using size
checks.
* create iterator
* and then the following pseudo code
``` int index=0;
int prev_element;
for each (val:iterator){
if(index==pos){
return val;
}
prev_element=val;
index++;
}
return prev_element;
```
##########
server/src/main/java/org/apache/druid/client/SegmentLoadInfo.java:
##########
@@ -36,29 +38,46 @@ public SegmentLoadInfo(DataSegment segment)
{
Preconditions.checkNotNull(segment, "segment");
this.segment = segment;
- this.servers = Sets.newConcurrentHashSet();
Review Comment:
Let this remain a concurrent HashSet.
##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java:
##########
@@ -192,20 +193,21 @@ private JsonParserIterator<SegmentStatusInCluster>
getMetadataSegments(
Set<String> watchedDataSources
)
{
- String query =
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus";
+ StringBuilder queryBuilder = new
StringBuilder("/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
Review Comment:
lets comment here also about the nuances of includeRealtimeFlag.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]